JUC并发编程

Posted by Beyonderwei on 2020-07-28
Words 6.2k and Reading Time 27 Minutes
Viewed Times

JUC并发编程

  1. 高内聚低耦合,线程操作资源类
  2. 生产者、消费者:判断,干活,通知
  3. 多线程交互(wait notify时)中要防止多线程的虚假唤醒条件的判断不能用if,只能用while。while循环体在线程被终止的时候会被重新判断而if不会被重新判断,即wait必须在while的循环体内。
  4. 标志位

知识点

  • synchronized 锁的不是某个方法,而是整个资源类,即当前的这个对象。
  • synchronized 标识的方法为static时,那么先执行的线程会锁住这个对象的类。该类的其他对象依然不能调用被synchronized标识的static方法。

一、JUC介绍

Java util concurrent :Java的并发工具包,主要包含下面三部分。

  • java.util.concurrent
  • java.util.concurrent.atomic
  • java.util.concurrent.locks

进程:

是具有一定独立功能的程序,分配资源的基本单位,是操作系统动态执行的基本单元。

线程:

是独立运行和独立调度的基本单位,可以利用进程所拥有的资源。

二、Lock接口

高内聚低耦合,线程操作资源类

​ 由java.util.concurrent.locks提供,相较于 synchronized 可以只锁一部分资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Ticket{
private int ticketCount = 30;
private Lock lock = new ReentrantLock(); // 可重用锁

public void saleTicket(){
// 下面抽成代码块 trylock
lock.lock();
try{
if (ticketCount > 0){
System.out.println(Thread.currentThread().getName() +
"\t卖出第:" + (ticketCount--) +
"\t还剩下:" + ticketCount); // 打出当前进程的名字
}
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}

三、线程间通信

​ 生产者、消费者:判断,干活,通知

​ 多线程交互(wait notify时)中要防止多线程的虚假唤醒()条件的判断不能用if,只能用while。while循环体在线程被终止的时候会被重新判断而if不会被重新判断。

  • wait() 会交出资源的控制权

  • sleep() 不会交出资源的控制权,死死的等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class AirConditioner{

private int number = 0;

public synchronized void increment() throws InterruptedException {
// 1. 判断
while (number != 0){
this.wait();
}
// 2. 干活
number ++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3. 通知
this.notifyAll();
}

public synchronized void decrement() throws InterruptedException {
// 1. 判断
while (number == 0){
this.wait();
}
// 2. 干活
number --;
System.out.println(Thread.currentThread().getName() + "\t" + number);
// 3. 通知
this.notifyAll();
}
}

四、生产者与消费者

​ 采用新的写法,使用新版本的await 和 signalAll 来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class AirConditione {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() {
lock.lock();
try{
while (number != 0){
condition.await();
}
number ++;
System.out.println(Thread.currentThread().getName() + "\t" + number);
condition.signalAll();
}catch(Exception e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
public void decrement() {
lock.lock();
try{
while(number == 0){
condition.await();
}
number --;
System.out.println(Thread.currentThread().getName() + "\t" + number);
condition.signalAll();
}catch(Exception e) {
e.printStackTrace();
}finally{
lock.unlock();
}
}
}

public class ProductCustom {
public static void main(String[] args) {
AirConditione airConditione = new AirConditione();
new Thread(() -> {
for (int i = 0; i <= 10; i++) {
airConditione.increment();
}
}, "A").start();

new Thread(() -> {
for (int i = 0; i <= 10; i++) {
airConditione.decrement();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i <= 10; i++) {
airConditione.increment();
}
}, "C").start();

new Thread(() -> {
for (int i = 0; i <= 10; i++) {
airConditione.decrement();
}
}, "D").start();
}
}

五、精准通知和顺序访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/**
* 精准通知和顺序访问
* 多线程之间按照顺序调度,实现 A -> B ->C
* A 打印5次后 B打印10次 C再打印15次
* 上述过程打印10轮
*/
class Resource {
int flag = 1; // 1 代表A 2 代表B 3代表C

private Lock lock = new ReentrantLock();
private Condition conditionA = lock.newCondition();
private Condition conditionB = lock.newCondition();
private Condition conditionC = lock.newCondition();

public void print5() throws InterruptedException {
lock.lock();
try{
// 判断
while (flag != 1){
conditionA.await();
}
// 干活
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 改标志
flag = 2;
// 通知
conditionB.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}

public void print10() throws InterruptedException {
lock.lock();
try{
// 判断
while (flag != 2){
conditionB.await();
}
// 干活
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 改标志
flag = 3;
// 通知
conditionC.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}

public void print15() throws InterruptedException {
lock.lock();
try{
// 判断
while (flag != 3){
conditionC.await();
}
// 干活
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
// 改标志
flag = 1;
// 通知
conditionA.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}

public class AccurateNotification {

public static void main(String[] args) {
Resource resource = new Resource();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
resource.print5();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "A").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
resource.print10();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();

new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
resource.print15();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
}

}

六、 八种锁的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/*
1 标准访问,先打印短信还是邮件
2 停4秒在短信方法内,先打印短信还是邮件
3 普通的hello方法,是先打短信还是hello
4 现在有两部手机,先打印短信还是邮件
5 两个静态同步方法,1部手机,先打印短信还是邮件
6 两个静态同步方法,2部手机,先打印短信还是邮件
7 1个静态同步方法,1个普通同步方法,1部手机,先打印短信还是邮件
8 1个静态同步方法,1个普通同步方法,2部手机,先打印短信还是邮件
*/
class Phone{
public static synchronized void sendEmail() throws InterruptedException {
try {
TimeUnit.SECONDS.sleep(4); // 相当于 Thread.sleep(4000);
}catch (Exception e){
e.printStackTrace();
}
System.out.println("------发送Email----");
}

public static synchronized void sendSMS(){
System.out.println("------发送SMS----");
}

public void hello(){
System.out.println("------hello----");
}
}

public class lock8 {
public static void main(String[] args) {
Phone phone1 = new Phone();
Phone phone2 = new Phone();

new Thread(() -> {
try {
phone1.sendEmail();
}catch (Exception e){
e.printStackTrace();
}
}, "A").start();

new Thread(() -> {
try {
// phone1.sendSMS();
// phone1.hello();
// phone2.hello();
}catch (Exception e){
e.printStackTrace();
}
}, "B").start();
}
}
  • 一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized方法锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法

  • 加个普通方法后发现和同步锁无关换成两个对象后,不是同一把锁了,情况立刻变化。

  • synchronized实现同步的基础:Java中的每一个对象都可以作为锁。具体表现为以下3种形式。

    • 对于普通同步方法,锁是当前实例对象。
    • 对于静态同步方法,锁是当前类的Class对象。
    • 对于同步方法块,锁是Synchonized括号里配置的对象
  • 当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁,
    所以毋须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。

  • 所有的静态同步方法用的也是同一把锁——类对象本身,这两把锁(对象和类)是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,只要它们同一个类的实例对象!

七、 集合类不安全(ArrayList)

  • ArrayList HashSet HashMap 都是线程不安全的

  • 会出现并发修改异常

  • CopyOnWriteArrayList 同时满足写和读,做到线程安全、且性能好。写时复制,相当于读写分离的思想。

  • CopyOnWriteArrayList 的 add 方法 底层源码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    Object[] elements = getArray();
    int len = elements.length;
    Object[] newElements = Arrays.copyOf(elements, len + 1);
    newElements[len] = e;
    setArray(newElements);
    return true;
    } finally {
    lock.unlock();
    }
    }
  • 知识点:ArrayList扩容为原来的一半。

八、HashSet不安全

1
2
3
4
5
6
7
8
9
10
11
public class HashSetNotSecurity {
public static void main(String[] args) {
Set<String> set = new CopyOnWriteArraySet<>(); // Collections.synchronizedMap(new HashSet<>()); // new HashSet<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(set);
}, String.valueOf(i)).start();
}
}
}

知识点:

  • HashSet的底层就是HashMap,new一个HashSet实际上就是new 了一个HashMap,HashSet的add方法调用的就是HashMap的put方法,此时键是add进去的值,value是一个固定不变的Object对象。

九、 HashMap不安全

1
2
3
4
5
6
7
8
9
10
11
12
public class HashMapNotSecurity {
public static void main(String[] args) {
Map<String, String> map = new ConcurrentHashMap<>(); // Collections.synchronizedMap(new HashMap<>()); // new HashMap<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 8));
System.out.println(map);
}, String.valueOf(i)).start();
}
}

}
  • 知识点:Map的底层是Node类型的数组+Node类型的链表(单向链表,hash冲突时出现(key不同但是hash相同,key相同时会进行值的替换),当单向链表大于8时,就变成了红黑树,为了快速查找)+红黑树。
    • 默认的初始大小为16,负载因子为0.75,一般只会改初始值(优化,避免频繁扩容)。(到12以后自动扩容,扩容一倍到32)
    • 数组中存放的是一个Nod节点。将KV存放到Node中再放到数组里。

十、Callable

1. Java中实现多线程的几种方法

  • 继承Thread类,实现run方法
  • 实现Runnable 接口,实现run方法
  • 实现Callable \接口,实现call方法 (有返回值、有异常、call方法)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class MyThread implements Callable<Integer> {

@Override
public Integer call() throws Exception {
System.out.println("hahahaha");
Thread.sleep(4000);
return 1001;
}
}

/**
* get 方法一般放在最后一行来执行,等待主线程任务完成再去看其他线程的结果
* 不然会出现,一直等待其他线程的情况
* 不论多少个线程调用futureTask 该任务只会执行一次
*/

public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask(new MyThread());
// 下面两个线程只会执行一次
new Thread(futureTask, "A").start();
new Thread(futureTask, "B").start();
System.out.println(futureTask.get()); // 可以获得线程的返回值
}
}

十、 CountDownLatch

等待所有的线程执行完,再执行主线程最后的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "执行结束。");
countDownLatch.countDown(); // 每次执行完,计数器减一
}, String.valueOf(i)).start();
}
// 等待countDownLatch 减到0的时候才能执行
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "主线程执行结束。");
}
}

十一、CyclicBarrier(循环屏障)

等到某些任务都执行完之后去执行另一个任务。(所有人到场了才开会的场景)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CyclicBarrierDemo {

public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() -> {
System.out.println("开始开会");
});

for (int i = 0; i < 7; i++) {
int finalI = i;
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "第" + finalI + "个人来了");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}

十二、Semaphore(信号量)

信号量主要用于多个共享资源的互斥使用(n个线程抢m个资源 n>m) 或 控制多线程的并发数。

  • acquire 当前线程调用acquire 操作时,它要么成功获取信号量,此时信号量会减一,要么一直等下去,直到有线程释放信号量,或超时。
  • release 实际上是将信号量的值加一,然后唤醒等待的线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3); // 模拟资源类,有3个资源

for (int i = 0; i < 6; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 占有一个资源 相当于 permits减一
System.out.println(Thread.currentThread().getName() + "抢到资源");
try{ TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) {e.printStackTrace();}
System.out.println(Thread.currentThread().getName() + "释放了资源");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release(); // 释放资源
}
}, String.valueOf(i)).start();
}
}
}

十三、ReadWriteLock

读-读可共享,写-读、写-写要独占。

  • 多个线程同时读一个资源类没有问题,所以为了满足并发量,读取共享资源应该可以同时进行。

  • 但如果有一个线程想去写共享资源时,就不应该再有其他线程可以对资源进行读或写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

public void put(String key, Object value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写入数据");
// 模拟网络延时
try{ TimeUnit.MILLISECONDS.sleep(100); } catch(InterruptedException e) {e.printStackTrace();}
map.put(key,value);
System.out.println(Thread.currentThread().getName() + "写入完成");
} catch (Exception e) {
e.printStackTrace();
}finally {
readWriteLock.writeLock().unlock();
}
}

public void get(String key){
readWriteLock.readLock().lock();
try {
System.out.println("读取数据");
// 模拟网络延时
try{ TimeUnit.MILLISECONDS.sleep(150); } catch(InterruptedException e) {e.printStackTrace();}
map.get(key);
System.out.println(Thread.currentThread().getName() + "读取完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}

public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();

for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
myCache.put(finalI + "", finalI + ""); // 5个线程同时写入数据
},String.valueOf(i)).start();
}

for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> {
myCache.get(finalI + ""); // 5个线程同时写入数据
},String.valueOf(i)).start();
}
}
}

十四、BlockingQueue(阻塞队列)

必须要阻塞和不得不阻塞的时候使用。

在多线程中,就是在有些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会被自动唤起。BlockingQueue帮我们完成了什么时候需要阻塞线程、什么时候需要唤醒线程。concurrent发布之前,这些需要自己实现。

  • 当队列是空的时候,从队列中获取元素的操作就会被阻塞,直到其他线程往空的队列中加入新的元素。
  • 当队列是满的,向队列中添加元素的操作就会被阻塞,直到其他线程从队列中移除一个或多个元素。

ArrayBlockingQueue:由数组结构组成的有界阻塞队列

LinkedBlockingQueue:由链表组成的有界(但大小默认为integer.MAX_VALUE)阻塞队列

SynchronousQueue:不存储运输的阻塞队列,即单个元素的队列。

LinkedBlockingdeque:由链表组成的双向阻塞队列。

四组方法:

方法类型 抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
查看 element() peek()

说明:

情况 情况说明
抛出异常 当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException:Queue full
当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException
特殊值 插入方法,成功ture失败false移除方法,
成功返回出队列的元素,队列里没有就返回null
一直阻塞 当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程直到put数据or响应中断退出
当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用
超时退出 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); // 大小为3的队列

/*--第一组 add remove element--*/
/*// 因为是数组,因此可以add相同的值
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
// System.out.println(blockingQueue.add("d")); // 异常

System.out.println(blockingQueue.remove());
System.out.println("当前队列的头:" + blockingQueue.element());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
// System.out.println(blockingQueue.remove()); // 异常*/


/*--第二组 offer poll peek--*/
/*System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
// System.out.println(blockingQueue.offer("d")); // false

System.out.println(blockingQueue.poll());
System.out.println("当前队列的头:" + blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
// System.out.println(blockingQueue.poll()); // null*/

/*--第三组 put take--*/
/*blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
// blockingQueue.put("d"); // 阻塞

System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
// System.out.println(blockingQueue.take()); // 阻塞*/


/*--第四组 put take--*/
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("a", 3, TimeUnit.SECONDS)); // 超时不候

System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS)); // 超时不候
}
}

十五、ThreadPool(线程池)

线程池控制线程的数量,处理过程中将任务放入队列,然后在线程创建之后启这些任务,如果线程数量超出了最大数量,超出数量的线程排队等候,等待其他线程执行完毕,再从队列中取出任务来执行。线程是稀缺资源,如果无限制的创建,不仅仅会消耗系统资源,还会降低系统稳定性,用线程池可以进行统一的分配、调优和监控。

主要特点:线程复用、控制最大并发量、管理线程。

Executors:线程池工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ThreadPoolDemo {
public static void main(String[] args) {
// 创建线程池
// ExecutorService threadPool = Executors.newFixedThreadPool(5); // 一个线程池中有5线程
// ExecutorService threadPool = Executors.newSingleThreadPool(); // 一个线程池中有一个线程
ExecutorService threadPool = Executors.newCachedThreadPool(); // 一个线程池有N个线程 自动根据任务增加或者减少线程

try {
for (int i = 0; i < 10; i++) {
// 创建执行任务
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() +"办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown(); // 销毁线程池
}
}
}

1. 底层源码

实际上上述三种方法调用的是同一个方法,返回了一个ThreadPoolExecutor对象。

  • Executors.newFixedThreadPool(nThreads)

    1
    2
    3
    4
    5
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
  • Executors.newSingleThreadExecutor()

    1
    2
    3
    4
    5
    6
    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }
  • Executors.newCachedThreadPool()

    1
    2
    3
    4
    5
    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }

2. 线程池的7个参数

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
  • corePoolSize:线程池的常驻线程数
  • maximumPoolSize:线程池能够容纳同时执行的最大线程数,>=1
  • keepAliveTime:多余空闲线程的存活时间,当数量超过corePoolSize时,空闲时间超过改值,那么超过corePoolSize数量的线程会被销毁。
  • unit:keepAliveTime的单位
  • workQueue:阻塞任务队列BlockingQueue,被提交但没有执行的任务
  • threadFactory:用于创建线程的线程工厂,默认
  • handler:拒绝策略,当队列中的任务满了并且工作线程大于等于线程池的最大线程数时,如何拒绝请求执行的runnable的策略。

3. 执行流程

4. 实际使用(重要)

实际中不使用Executors去创建,而是通过ThreadPoolExcutor去创建,避免资源被耗尽。

  • FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会造成堆积大量请求,导致OOM(out of memory)。
  • CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,导致OOM。
1
2
3
4
5
6
7
8
9
// 该线程池能够承受的最大任务数为 :5 + 3 = 8
ExecutorService threadPool = new ThreadPoolExecutor(
2, // 线程池的常驻线程数
5, // 线程池能够容纳同时执行的最大线程数
3L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3), // 阻塞队列的大小,默认为int.MAX_VALUE
Executors.defaultThreadFactory(), // 使用默认的工厂
new ThreadPoolExecutor.AbortPolicy()); // AbortPolicy拒绝策略(默认)——超出报拒绝执行异常,总共四种

5. 四种拒绝策略

  • AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
  • CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不
    会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中
    尝试再次提交当前任务。
  • DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。
    如果允许任务丢失,这是最好的一种策略。

6. 最大线程数配置方式(重要)

  • CPU密集型任务
    • maximumPoolSize 大小为 电脑的CPU核数加一(不允许写死),通过 Runtime.getRuntime().availableProcessors();来获得核数。
  • IO 密集型任务:
    • maximumPoolSize = CPU的核数 / CPU的阻塞系数。

十六、链式编程 + 流式计算

java中的@Accessors(chain = true) 标识允许以链式的方式书写代码。

JDK中常用的函数式接口:(只要是函数式接口就可以写成 lambda 表达式的形式。)

1. Stream 流

有了 Stream 基础后,数据库的相应排序、筛选等操作即可通过流的方式直接在后台处理数据,只需要通过数据库获得原始数据即可,提高数据库性能。

① 简介:

Stream 是数据渠道,用于操作数据源(集合和数组等)所生成的元素序列,集合讲的是数据,流讲的是计算

② 特点

  • Stream 不会存储元素
  • Stream 不会改变源对象,会返回一个持有结果的新的Stream
  • Stream 操作是延迟进行的,这意味着他们会等到需要结果的时候才执行。

③ 使用

  • 创建:创建一个Stream:一个数据源(数组、集合)
  • 中间操作:一个中间操作,处理数据源数据
  • 终止操作:一个终止操作,执行中间操作链,产生结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Data
class User
{
private Integer id;
private String userName;
private int age;
}

/**
* @create 2019-02-26 22:24
*
* 题目:请按照给出数据,找出同时满足
* 偶数ID且年龄大于24
* 用户名转为大写且用户名字母倒排序
* 最后只输出一个用户名字
*/
public class StreamDemo
{
public static void main(String[] args)
{
User u1 = new User(11,"a",23);
User u2 = new User(12,"b",24);
User u3 = new User(13,"c",22);
User u4 = new User(14,"d",28);
User u5 = new User(16,"e",26);

List<User> list = Arrays.asList(u1,u2,u3,u4,u5);

list.stream().filter(p -> {
return p.getId() % 2 == 0;
}).filter(p -> {
return p.getAge() > 24;
}).map(f -> {
return f.getUserName().toUpperCase();
}).sorted((o1, o2) -> {
return o2.compareTo(o1);
}).limit(1).forEach(System.out::println);

// R apply(T t);
Function<String,Integer> function = t -> {return t.length();};
System.out.println(function.apply("abc"));

// boolean test(T t);
Predicate<String> predicate = t -> {return t.startsWith("a");};
System.out.println(predicate.test("a"));

//void accept(T t);
Consumer<String> consumer = t -> {System.out.println(t);};
consumer.accept("java1018");

// T get();
Supplier<String> supplier = () -> {return UUID.randomUUID().toString();};
System.out.println(supplier.get());

}
}

十七、ForkJoin(分支合并)

将多个复杂任务拆解为多个任务,即fork的过程,多个线程同时计算,计算完以后再将结果合并,即jion的过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 拆分计算0-100
class MyTask extends RecursiveTask<Integer> {
private static final Integer ADJUST_VALUE = 10;
private int begin;
private int end;
private int result;

public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}

@Override
protected Integer compute() {
if((end - begin)<=ADJUST_VALUE){
for(int i =begin;i <= end;i++){
result = result + i;
}
}else{
int middle = (begin + end)/2;
MyTask task01 = new MyTask(begin,middle);
MyTask task02 = new MyTask(middle+1,end);
task01.fork();
task02.fork();
result = task01.join() + task02.join();
}


return result;
}
}


/**
* 分支合并例子
* ForkJoinPool
* ForkJoinTask
* RecursiveTask
*/
public class ForkJoinDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException {

MyTask myTask = new MyTask(0,100);
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);

System.out.println(forkJoinTask.get());

forkJoinPool.shutdown();
}
}

十八、异步回调

像Axios的异步访问一样,启用异步任务后,执行完会进入异步回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class CompletableFutureDemo {

public static void main(String[] args) throws Exception {
//同步,异步,异步回调

//同步
// CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{
// System.out.println(Thread.currentThread().getName()+"\t completableFuture1");
// });
// completableFuture1.get();

//异步回调
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+"\t completableFuture2");
int i = 10/0;
return 1024;
});

completableFuture2.whenComplete((t,u)->{
// 当任务完成时进入
System.out.println("-------t="+t);
System.out.println("-------u="+u);
}).exceptionally(f->{
// 当任务出现异常时进入
System.out.println("-----exception:"+f.getMessage());
return 444;
}).get();

}
}

本文为作者原创文章,未经作者允许不得转载。

...

...

00:00
00:00