CountDownLatch和CyclicBarrier:如何让多线程步调一致?

CountDownLatch和CyclicBarrier主要解决线程的调度问题。例如如下场景:订单的对账,订单系统完成的订单和已经派发的订单的对账,对于有异常的订单要修复。这里是3个操作,见下面伪代码

1
2
3
4
5
Order order=getOrder();

POrder porder=getPOder();

checkAndFix(order,porder);

如果每天的订单量很大这就会很耗时,我们可以考虑串行的方法,可以用线程池的方法同时去查询俩个订单,然后在check这时候我们要解决线程调度的问题,CountDownLatch就排上了用场,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService pool = Executors.newFixedThreadPool(2);
AtomicReference<Object> order = new AtomicReference<>();
pool.execute(() -> {
order.set(getOrder());
countDownLatch.countDown();
});

AtomicReference<Object> porder = new AtomicReference<>();
pool.execute(() -> {
porder.set(getPOrder());
countDownLatch.countDown();
});

try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
checkAndFix(order.get(),porder.get());

不过这里checkAndFix还是主线程串行的能否在进一步优化呢,答案是能这里用到CyclicBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
ExecutorService callBack = Executors.newFixedThreadPool(1);
Vector<Object> orders=new Vector<>();
Vector<Object> porders=new Vector<>();
CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{
callBack.execute(()->{
checkAndFix( orders.get(0),porders.get(0));
});
});

while(true){
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.execute(() -> {
orders.add(getOrder());
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
pool.execute(() -> {
porders.add(getPOrder());
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}

CyclicBarrier和CountDownLatch的区别是可重入的,当见到0还会从2开始。另外CyclicBarrier可以设置Callback