JUC并发编程
高内聚低耦合,线程操作资源类
生产者、消费者:判断,干活,通知
多线程交互(wait notify时)中要防止多线程的虚假唤醒 。条件的判断不能用if,只能用while。 while循环体在线程被终止的时候会被重新判断而if不会被重新判断,即wait必须在while的循环体内。
标志位
知识点
synchronized 锁的不是某个方法,而是整个资源类,即当前的这个对象。
当 synchronized 标识的方法为static时,那么先执行的线程会锁住这个对象的类。该类的其他对象依然不能调用被synchronized标识的static方法。
一、JUC介绍 Java util concurrent :Java的并发工具包,主要包含下面三部分。
java.util.concurrent
java.util.concurrent.atomic
java.util.concurrent.locks
进程:
是具有一定独立功能的程序,分配资源的基本单位,是操作系统动态执行的基本单元。
线程:
是独立运行和独立调度的基本单位,可以利用进程所拥有的资源。
二、Lock接口 高内聚低耦合,线程操作资源类 。
由java.util.concurrent.locks提供,相较于 synchronized 可以只锁一部分资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Ticket { private int ticketCount = 30 ; private Lock lock = new ReentrantLock(); public void saleTicket () { lock.lock(); try { if (ticketCount > 0 ){ System.out.println(Thread.currentThread().getName() + "\t卖出第:" + (ticketCount--) + "\t还剩下:" + ticketCount); } }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } }
三、线程间通信 生产者、消费者:判断,干活,通知 。
多线程交互(wait notify时)中要防止多线程的虚假唤醒() 。条件的判断不能用if,只能用while。while循环体在线程被终止的时候会被重新判断而if不会被重新判断。
wait() 会交出资源的控制权
sleep() 不会交出资源的控制权,死死的等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class AirConditioner { private int number = 0 ; public synchronized void increment () throws InterruptedException { while (number != 0 ){ this .wait(); } number ++; System.out.println(Thread.currentThread().getName() + "\t" + number); this .notifyAll(); } public synchronized void decrement () throws InterruptedException { while (number == 0 ){ this .wait(); } number --; System.out.println(Thread.currentThread().getName() + "\t" + number); this .notifyAll(); } }
四、生产者与消费者 采用新的写法,使用新版本的await 和 signalAll 来实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 class AirConditione { private int number = 0 ; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment () { lock.lock(); try { while (number != 0 ){ condition.await(); } number ++; System.out.println(Thread.currentThread().getName() + "\t" + number); condition.signalAll(); }catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } public void decrement () { lock.lock(); try { while (number == 0 ){ condition.await(); } number --; System.out.println(Thread.currentThread().getName() + "\t" + number); condition.signalAll(); }catch (Exception e) { e.printStackTrace(); }finally { lock.unlock(); } } } public class ProductCustom { public static void main (String[] args) { AirConditione airConditione = new AirConditione(); new Thread(() -> { for (int i = 0 ; i <= 10 ; i++) { airConditione.increment(); } }, "A" ).start(); new Thread(() -> { for (int i = 0 ; i <= 10 ; i++) { airConditione.decrement(); } }, "B" ).start(); new Thread(() -> { for (int i = 0 ; i <= 10 ; i++) { airConditione.increment(); } }, "C" ).start(); new Thread(() -> { for (int i = 0 ; i <= 10 ; i++) { airConditione.decrement(); } }, "D" ).start(); } }
五、精准通知和顺序访问 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 class Resource { int flag = 1 ; private Lock lock = new ReentrantLock(); private Condition conditionA = lock.newCondition(); private Condition conditionB = lock.newCondition(); private Condition conditionC = lock.newCondition(); public void print5 () throws InterruptedException { lock.lock(); try { while (flag != 1 ){ conditionA.await(); } for (int i = 0 ; i < 5 ; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i); } flag = 2 ; conditionB.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void print10 () throws InterruptedException { lock.lock(); try { while (flag != 2 ){ conditionB.await(); } for (int i = 0 ; i < 10 ; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i); } flag = 3 ; conditionC.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } public void print15 () throws InterruptedException { lock.lock(); try { while (flag != 3 ){ conditionC.await(); } for (int i = 0 ; i < 15 ; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i); } flag = 1 ; conditionA.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } } } public class AccurateNotification { public static void main (String[] args) { Resource resource = new Resource(); new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { try { resource.print5(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A" ).start(); new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { try { resource.print10(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B" ).start(); new Thread(() -> { for (int i = 0 ; i < 10 ; i++) { try { resource.print15(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C" ).start(); } }
六、 八种锁的情况 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 class Phone { public static synchronized void sendEmail () throws InterruptedException { try { TimeUnit.SECONDS.sleep(4 ); }catch (Exception e){ e.printStackTrace(); } System.out.println("------发送Email----" ); } public static synchronized void sendSMS () { System.out.println("------发送SMS----" ); } public void hello () { System.out.println("------hello----" ); } } public class lock8 { public static void main (String[] args) { Phone phone1 = new Phone(); Phone phone2 = new Phone(); new Thread(() -> { try { phone1.sendEmail(); }catch (Exception e){ e.printStackTrace(); } }, "A" ).start(); new Thread(() -> { try { }catch (Exception e){ e.printStackTrace(); } }, "B" ).start(); } }
一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized方法锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法
加个普通方法后发现和同步锁无关换成两个对象后,不是同一把锁了,情况立刻变化。
synchronized实现同步的基础:Java中的每一个对象都可以作为锁。具体表现为以下3种形式。
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的Class对象。
对于同步方法块,锁是Synchonized括号里配置的对象
当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁, 所以毋须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。
所有的静态同步方法用的也是同一把锁——类对象本身,这两把锁(对象和类)是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,只要它们同一个类的实例对象!
七、 集合类不安全(ArrayList)
ArrayList HashSet HashMap 都是线程不安全的
会出现并发修改异常
CopyOnWriteArrayList 同时满足写和读,做到线程安全、且性能好。写时复制,相当于读写分离的思想。
CopyOnWriteArrayList 的 add 方法 底层源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public boolean add (E e) { final ReentrantLock lock = this .lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1 ); newElements[len] = e; setArray(newElements); return true ; } finally { lock.unlock(); } }
知识点:ArrayList扩容为原来的一半。
八、HashSet不安全 1 2 3 4 5 6 7 8 9 10 11 public class HashSetNotSecurity { public static void main (String[] args) { Set<String> set = new CopyOnWriteArraySet<>(); for (int i = 0 ; i < 30 ; i++) { new Thread(() -> { set.add(UUID.randomUUID().toString().substring(0 , 8 )); System.out.println(set); }, String.valueOf(i)).start(); } } }
知识点:
HashSet的底层就是HashMap,new一个HashSet实际上就是new 了一个HashMap,HashSet的add方法调用的就是HashMap的put方法,此时键是add进去的值,value是一个固定不变的Object对象。
九、 HashMap不安全 1 2 3 4 5 6 7 8 9 10 11 12 public class HashMapNotSecurity { public static void main (String[] args) { Map<String, String> map = new ConcurrentHashMap<>(); for (int i = 0 ; i < 30 ; i++) { new Thread(() -> { map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0 , 8 )); System.out.println(map); }, String.valueOf(i)).start(); } } }
知识点:Map的底层是Node类型的数组+Node类型的链表(单向链表,hash冲突时出现(key不同但是hash相同,key相同时会进行值的替换),当单向链表大于8时,就变成了红黑树,为了快速查找)+红黑树。
默认的初始大小为16,负载因子为0.75,一般只会改初始值(优化,避免频繁扩容)。(到12以后自动扩容,扩容一倍到32)
数组中存放的是一个Nod节点。将KV存放到Node中再放到数组里。
十、Callable 1. Java中实现多线程的几种方法
继承Thread类,实现run方法
实现Runnable 接口,实现run方法
实现Callable \接口,实现call方法 (有返回值、有异常、call方法)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class MyThread implements Callable <Integer > { @Override public Integer call () throws Exception { System.out.println("hahahaha" ); Thread.sleep(4000 ); return 1001 ; } } public class CallableDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask futureTask = new FutureTask(new MyThread()); new Thread(futureTask, "A" ).start(); new Thread(futureTask, "B" ).start(); System.out.println(futureTask.get()); } }
十、 CountDownLatch 等待所有的线程执行完,再执行主线程最后的部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class CountDownLatchDemo { public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6 ); for (int i = 0 ; i < 6 ; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + "执行结束。" ); countDownLatch.countDown(); }, String.valueOf(i)).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName() + "主线程执行结束。" ); } }
十一、CyclicBarrier(循环屏障) 等到某些任务都执行完之后去执行另一个任务。(所有人到场了才开会的场景)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class CyclicBarrierDemo { public static void main (String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(7 ,() -> { System.out.println("开始开会" ); }); for (int i = 0 ; i < 7 ; i++) { int finalI = i; new Thread(() -> { System.out.println(Thread.currentThread().getName() + "第" + finalI + "个人来了" ); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }, String.valueOf(i)).start(); } } }
十二、Semaphore(信号量) 信号量主要用于多个共享资源的互斥使用(n个线程抢m个资源 n>m) 或 控制多线程的并发数。
acquire 当前线程调用acquire 操作时,它要么成功获取信号量,此时信号量会减一,要么一直等下去,直到有线程释放信号量,或超时。
release 实际上是将信号量的值加一,然后唤醒等待的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class SemaphoreDemo { public static void main (String[] args) { Semaphore semaphore = new Semaphore(3 ); for (int i = 0 ; i < 6 ; i++) { new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "抢到资源" ); try { TimeUnit.SECONDS.sleep(3 ); } catch (InterruptedException e) {e.printStackTrace();} System.out.println(Thread.currentThread().getName() + "释放了资源" ); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release(); } }, String.valueOf(i)).start(); } } }
十三、ReadWriteLock 读-读可共享,写-读、写-写要独占。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 class MyCache { private volatile Map<String,Object> map = new HashMap<>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public void put (String key, Object value) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "写入数据" ); try { TimeUnit.MILLISECONDS.sleep(100 ); } catch (InterruptedException e) {e.printStackTrace();} map.put(key,value); System.out.println(Thread.currentThread().getName() + "写入完成" ); } catch (Exception e) { e.printStackTrace(); }finally { readWriteLock.writeLock().unlock(); } } public void get (String key) { readWriteLock.readLock().lock(); try { System.out.println("读取数据" ); try { TimeUnit.MILLISECONDS.sleep(150 ); } catch (InterruptedException e) {e.printStackTrace();} map.get(key); System.out.println(Thread.currentThread().getName() + "读取完成" ); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } } public class ReadWriteLockDemo { public static void main (String[] args) { MyCache myCache = new MyCache(); for (int i = 0 ; i < 10 ; i++) { int finalI = i; new Thread(() -> { myCache.put(finalI + "" , finalI + "" ); },String.valueOf(i)).start(); } for (int i = 0 ; i < 5 ; i++) { int finalI = i; new Thread(() -> { myCache.get(finalI + "" ); },String.valueOf(i)).start(); } } }
十四、BlockingQueue(阻塞队列) 必须要阻塞和不得不阻塞的时候使用。
在多线程中,就是在有些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会被自动唤起。BlockingQueue帮我们完成了什么时候需要阻塞线程、什么时候需要唤醒线程。concurrent发布之前,这些需要自己实现。
当队列是空的时候,从队列中获取元素的操作就会被阻塞,直到其他线程往空的队列中加入新的元素。
当队列是满的,向队列中添加元素的操作就会被阻塞,直到其他线程从队列中移除一个或多个元素。
ArrayBlockingQueue :由数组结构组成的有界阻塞队列
LinkedBlockingQueue :由链表组成的有界(但大小默认为integer.MAX_VALUE)阻塞队列
SynchronousQueue :不存储运输的阻塞队列,即单个元素的队列。
LinkedBlockingdeque :由链表组成的双向阻塞队列。
四组方法:
方法类型
抛出异常
特殊值
阻塞
超时
插入
add(e)
offer(e)
put(e)
offer(e, time, unit)
移除
remove()
poll()
take()
poll(time, unit)
查看
element()
peek()
—
—
说明:
情况
情况说明
抛出异常
当阻塞队列满时,再往队列里add插入元素会抛IllegalStateException:Queue full 当阻塞队列空时,再往队列里remove移除元素会抛NoSuchElementException
特殊值
插入方法,成功ture失败false移除方法, 成功返回出队列的元素,队列里没有就返回null
一直阻塞
当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程直到put数据or响应中断退出 当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用
超时退出
当阻塞队列满时,队列会阻塞生产者线程一定时间,超过限时后生产者线程会退出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class BlockingQueueDemo { public static void main (String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3 ); System.out.println(blockingQueue.offer("a" )); System.out.println(blockingQueue.offer("b" )); System.out.println(blockingQueue.offer("c" )); System.out.println(blockingQueue.offer("a" , 3 , TimeUnit.SECONDS)); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll(3 ,TimeUnit.SECONDS)); } }
十五、ThreadPool(线程池) 线程池控制线程的数量,处理过程中将任务放入队列,然后在线程创建之后启这些任务,如果线程数量超出了最大数量,超出数量的线程排队等候,等待其他线程执行完毕,再从队列中取出任务来执行。线程是稀缺资源,如果无限制的创建,不仅仅会消耗系统资源,还会降低系统稳定性,用线程池可以进行统一的分配、调优和监控。
主要特点:线程复用、控制最大并发量、管理线程。
Executors:线程池工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class ThreadPoolDemo { public static void main (String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); try { for (int i = 0 ; i < 10 ; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() +"办理业务" ); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
1. 底层源码 实际上上述三种方法调用的是同一个方法,返回了一个ThreadPoolExecutor对象。
Executors.newFixedThreadPool(nThreads)
1 2 3 4 5 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
Executors.newSingleThreadExecutor()
1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
Executors.newCachedThreadPool()
1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor(0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
2. 线程池的7个参数 1 2 3 4 5 6 7 8 9 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... }
corePoolSize:线程池的常驻线程数
maximumPoolSize:线程池能够容纳同时执行的最大线程数,>=1
keepAliveTime:多余空闲线程的存活时间,当数量超过corePoolSize时,空闲时间超过改值,那么超过corePoolSize数量的线程会被销毁。
unit:keepAliveTime的单位
workQueue:阻塞任务队列BlockingQueue,被提交但没有执行的任务
threadFactory:用于创建线程的线程工厂,默认
handler:拒绝策略,当队列中的任务满了并且工作线程大于等于线程池的最大线程数时,如何拒绝请求执行的runnable的策略。
3. 执行流程
4. 实际使用(重要) 实际中不使用Executors去创建 ,而是通过ThreadPoolExcutor去创建,避免资源被耗尽。
FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会造成堆积大量请求,导致OOM(out of memory)。
CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,导致OOM。
1 2 3 4 5 6 7 8 9 ExecutorService threadPool = new ThreadPoolExecutor( 2 , 5 , 3L , TimeUnit.SECONDS, new LinkedBlockingQueue<>(3 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
5. 四种拒绝策略
AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行
CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不 会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加人队列中 尝试再次提交当前任务。
DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。 如果允许任务丢失,这是最好的一种策略。
6. 最大线程数配置方式(重要)
CPU密集型任务
maximumPoolSize 大小为 电脑的CPU核数加一(不允许写死 ),通过 Runtime.getRuntime().availableProcessors();
来获得核数。
IO 密集型任务:
maximumPoolSize = CPU的核数 / CPU的阻塞系数。
十六、链式编程 + 流式计算 java中的@Accessors(chain = true) 标识允许以链式的方式书写代码。
JDK中常用的函数式接口:(只要是函数式接口就可以写成 lambda 表达式的形式。)
1. Stream 流 有了 Stream 基础后,数据库的相应排序、筛选等操作即可通过流的方式直接在后台处理数据,只需要通过数据库获得原始数据即可,提高数据库性能。
① 简介: Stream 是数据渠道,用于操作数据源(集合和数组等)所生成的元素序列,集合讲的是数据,流讲的是计算 。
② 特点
Stream 不会存储元素
Stream 不会改变源对象,会返回一个持有结果的新的Stream
Stream 操作是延迟进行的,这意味着他们会等到需要结果的时候才执行。
③ 使用
创建:创建一个Stream:一个数据源(数组、集合)
中间操作:一个中间操作,处理数据源数据
终止操作:一个终止操作,执行中间操作链,产生结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 @Data class User { private Integer id; private String userName; private int age; } public class StreamDemo { public static void main (String[] args) { User u1 = new User(11 ,"a" ,23 ); User u2 = new User(12 ,"b" ,24 ); User u3 = new User(13 ,"c" ,22 ); User u4 = new User(14 ,"d" ,28 ); User u5 = new User(16 ,"e" ,26 ); List<User> list = Arrays.asList(u1,u2,u3,u4,u5); list.stream().filter(p -> { return p.getId() % 2 == 0 ; }).filter(p -> { return p.getAge() > 24 ; }).map(f -> { return f.getUserName().toUpperCase(); }).sorted((o1, o2) -> { return o2.compareTo(o1); }).limit(1 ).forEach(System.out::println); Function<String,Integer> function = t -> {return t.length();}; System.out.println(function.apply("abc" )); Predicate<String> predicate = t -> {return t.startsWith("a" );}; System.out.println(predicate.test("a" )); Consumer<String> consumer = t -> {System.out.println(t);}; consumer.accept("java1018" ); Supplier<String> supplier = () -> {return UUID.randomUUID().toString();}; System.out.println(supplier.get()); } }
十七、ForkJoin(分支合并) 将多个复杂任务拆解为多个任务,即fork的过程,多个线程同时计算,计算完以后再将结果合并,即jion的过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 class MyTask extends RecursiveTask <Integer > { private static final Integer ADJUST_VALUE = 10 ; private int begin; private int end; private int result; public MyTask (int begin, int end) { this .begin = begin; this .end = end; } @Override protected Integer compute () { if ((end - begin)<=ADJUST_VALUE){ for (int i =begin;i <= end;i++){ result = result + i; } }else { int middle = (begin + end)/2 ; MyTask task01 = new MyTask(begin,middle); MyTask task02 = new MyTask(middle+1 ,end); task01.fork(); task02.fork(); result = task01.join() + task02.join(); } return result; } } public class ForkJoinDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { MyTask myTask = new MyTask(0 ,100 ); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask); System.out.println(forkJoinTask.get()); forkJoinPool.shutdown(); } }
十八、异步回调 像Axios的异步访问一样,启用异步任务后,执行完会进入异步回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class CompletableFutureDemo { public static void main (String[] args) throws Exception { CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"\t completableFuture2" ); int i = 10 /0 ; return 1024 ; }); completableFuture2.whenComplete((t,u)->{ System.out.println("-------t=" +t); System.out.println("-------u=" +u); }).exceptionally(f->{ System.out.println("-----exception:" +f.getMessage()); return 444 ; }).get(); } }
本文为作者原创文章,未经作者允许不得转载。