Future:如何用多线程实现最优的“烧水泡茶”程序?

线程池中可以通过Future模式实现对异步任务的状态的监控和获取异步任务的结果,具体见ExecutorService下的3个方法

获取异步任务的结果

1
2
3
4
5
6
// 提交 Runnable 任务
Future<?> submit(Runnable task)
// 提交 Callable 任务
<T> Future<T> submit(Callable<T> task);
// 提交 Runnable 任务及结果引用
<T> Future<T> submit(Runnable task, T result);

第一个方法是无返回值很类似join方法
第二个方法是可以获取task的返回值
第三个方法是可以通过T result来实现俩个线程之间的交互。如下

FutreTask

和Furtre很类似但是它继承自Furture和Runable可以作为对象传递给线程池或者线程。然后通过get来获取执行结果。如下代码,开水泡茶问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class FutureTest {
private static ExecutorService executor = Executors.newFixedThreadPool(2);

public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> ft2 = new FutureTask<>(() -> {
System.out.println("洗茶杯");
Thread.sleep(1000L);

System.out.println("找茶叶");
Thread.sleep(1000L);

return "龙井";
});

FutureTask<String> ft1 = new FutureTask<>(() -> {
System.out.println("洗茶壶");
Thread.sleep(1000L);

System.out.println("烧水");
Thread.sleep(10000L);
System.out.println("水开了");
String ret = ft2.get();
return "上茶"+ret;
});


executor.submit(ft1);
executor.submit(ft2);
System.out.println(ft1.get());
}
}

近期的使用场景:
比如一个很大的列表查询,你可以拆分成10个线程同时查询然后在查询完后合并成一个结果集。

CompletableFuture

简化Future的线程调度难度。构造函数是4个静态方法

1
2
3
4
5
6
// 使用默认线程池
static CompletableFuture<Void> runAsync(Runnable runnable)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 可以指定线程池 对于IO这种消耗线程的方式强烈推荐
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

ft1完成后执行的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Function对象有参数有返回值
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);

# Consumer对象有参数无返回值
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);

# Runnable对象无参数无返回值
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);

CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);

And合并关系

ft1、ft2都结束后做一些操作,类似join。方法区别如上

1
2
3
4
5
6
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);

OR或者关系

ft1、ft2有一个完成

1
2
3
4
5
6
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);

异常处理

1
2
3
4
5
CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);

CompletionService

我们用ThreadPool和Future优化了步编程中协作的部分,取代了线程的join。但是我们在实际使用中往往会碰到下面的问题,具体见代码

  1. 运行了3个异步任务,分别用了15s,10s,5s。
  2. 之后会在另一个线程池对这些任务的结果进行计算。这时候在计算的线程池中会变成串行,由于1任务过久,所以任务2,3即便先完成了前面的步骤也要等1完成后在执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(3);

Future<Integer> ft1 = threadPool.submit(() -> {
Thread.sleep(15000);
System.out.println("ft1 sleep 15s return 1");
return 1;
});

Future<Integer> ft2 = threadPool.submit(() -> {
Thread.sleep(10000);
System.out.println("ft2 sleep 10s return 2");
return 2;
});

Future<Integer> ft3 = threadPool.submit(() -> {
Thread.sleep(5000);
System.out.println("ft3 sleep 5s return 3");
return 3;
});


ExecutorService calT=Executors.newSingleThreadExecutor();
Future ftRes=calT.submit(()->{
try {
System.out.println("save:"+ft1.get());
System.out.println("save:"+ft2.get());
System.out.println("save:"+ft3.get());
//todo 比较大小
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});

ftRes.get();
}

如果对这段代码进一步优化呢我们可以采用增加阻塞队列的方式。许多人加个阻塞队列就好了。异步执行的结果加入到阻塞队列中,最终通过阻塞队列后去方法。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
BlockingQueue<Integer> queue=new ArrayBlockingQueue<Integer>(3);
ExecutorService calT=Executors.newFixedThreadPool(3);
calT.execute(()-> {
queue.put(ft1.get());
});
calT.execute(()-> {
queue.put(ft2.get());
});
calT.execute(()-> {
queue.put(ft3.get());
});
for(int i=0;i<3;i++){
try {
System.out.println("save:"+ queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("all finish");

确实能解决上述的问题,JAVA8之后为我们提供了更方便的写法(实现原理相同),即:CompletionService,有另种狗仔方法

1
2
new ExecutorCompletionService(Executor executor); //LinkedBlockingQueue 默认使用
new ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)

上面的程序可以简写为这样

1
2
3
4
5
6
7
8
9
10
11
ExecutorCompletionService<Integer> ecs=new ExecutorCompletionService(threadPool,queue);
ecs.submit(callable1);
ecs.submit(callable2);
ecs.submit(callable3);
for(int i=0;i<3;i++){
try {
System.out.println("save:"+ ecs.take().get());
} catch (InterruptedException e) {
e.printStackTrace();
}
}

关于take()和pool()区别。take()在队列为空会阻塞线程,poll如果队列为空会返回null,

1
2
3
4
5
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;//会等待一段时间。

使用场景

Dubbo的Forking Cluster模式,对于实时性较高的操作,consumer端会同时并行调用俩个请求,哪个现有结果优先返回,代码如下,不过缺点是比较浪费资源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
*
* @param fork 并行度是2
* @param timeOutMills 超时时间
* @param callable 逻辑方法
* @param <T> 返回值,多个方法返回第一个返回的
* @return
*/
public <T> T forkCluster(int fork, int timeOutMills, Callable<T> callable) {
ExecutorCompletionService<T> forkService = new ExecutorCompletionService(Executors.newFixedThreadPool(fork), new ArrayBlockingQueue<>(fork));
List<Future<T>> ftContainer=new ArrayList<>();
for (int i = 0; i < 2; i++) {
ftContainer.add(forkService.submit(callable));
}

try {
T result = forkService.poll(timeOutMills, TimeUnit.MILLISECONDS).get();
if (result == null) {
throw new RuntimeException("返回结果为空抛异常");
}
return result;
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException("返回结果为空抛异常");
} catch (ExecutionException e) {
e.printStackTrace();
throw new RuntimeException("返回结果为空抛异常");
}finally {
ftContainer.forEach(ft->{
ft.cancel(true);
});
}
}

Fork/join用分治思想加速我们的递归

待完善