线程池中可以通过Future模式实现对异步任务的状态的监控和获取异步任务的结果,具体见ExecutorService下的3个方法
获取异步任务的结果
1 | // 提交 Runnable 任务 |
第一个方法是无返回值很类似join方法
第二个方法是可以获取task的返回值
第三个方法是可以通过T result来实现俩个线程之间的交互。如下
FutreTask
和Furtre很类似但是它继承自Furture和Runable可以作为对象传递给线程池或者线程。然后通过get来获取执行结果。如下代码,开水泡茶问题
1 | public class FutureTest { |
近期的使用场景:
比如一个很大的列表查询,你可以拆分成10个线程同时查询然后在查询完后合并成一个结果集。
CompletableFuture
简化Future的线程调度难度。构造函数是4个静态方法
1 | // 使用默认线程池 |
ft1完成后执行的操作
1 | # Function对象有参数有返回值 |
And合并关系
ft1、ft2都结束后做一些操作,类似join。方法区别如上
1 | CompletionStage<R> thenCombine(other, fn); |
OR或者关系
ft1、ft2有一个完成
1 | CompletionStage applyToEither(other, fn); |
异常处理
1 | CompletionStage exceptionally(fn); |
CompletionService
我们用ThreadPool和Future优化了步编程中协作的部分,取代了线程的join。但是我们在实际使用中往往会碰到下面的问题,具体见代码
- 运行了3个异步任务,分别用了15s,10s,5s。
- 之后会在另一个线程池对这些任务的结果进行计算。这时候在计算的线程池中会变成串行,由于1任务过久,所以任务2,3即便先完成了前面的步骤也要等1完成后在执行
1 |
|
如果对这段代码进一步优化呢我们可以采用增加阻塞队列的方式。许多人加个阻塞队列就好了。异步执行的结果加入到阻塞队列中,最终通过阻塞队列后去方法。如下:
1 | BlockingQueue<Integer> queue=new ArrayBlockingQueue<Integer>(3); |
确实能解决上述的问题,JAVA8之后为我们提供了更方便的写法(实现原理相同),即:CompletionService,有另种狗仔方法
1 | new ExecutorCompletionService(Executor executor); //LinkedBlockingQueue 默认使用 |
上面的程序可以简写为这样
1 | ExecutorCompletionService<Integer> ecs=new ExecutorCompletionService(threadPool,queue); |
关于take()和pool()区别。take()在队列为空会阻塞线程,poll如果队列为空会返回null,
1 | Future<V> submit(Callable<V> task); |
使用场景
Dubbo的Forking Cluster模式,对于实时性较高的操作,consumer端会同时并行调用俩个请求,哪个现有结果优先返回,代码如下,不过缺点是比较浪费资源
1 | /** |
Fork/join用分治思想加速我们的递归
待完善