CountDownLatch

CountDownLatch解读

  • 是基于AQS原理,共享锁的一种实现,构造函数里的count值代表计数器,会将count赋值给state。
  • CountDownLatch不可重入。
  • 通过await()来阻塞线程。countDown使计数器-1,当count为0时候所有await的线程会同时获取锁。

实现原理

CountDownLatch有一个内部类Sync,该类继承了AQS,await()方法和countDown方法都是调用Sync的tryAcquireShared和tryReleaseShared

共享锁获取锁-tryAcquireShared

CountDownLatch掉用await()实际上是调用sync的acquireSharedInterruptibly,最终会调用AQS中的doAcquireSharedInterruptiblyState,在该方法中调用doAcquireSharedInterruptibly,在改方法中,会调用CountDownLatch实现的tryAcquireShared方法如果state==0返回1,否则-1,其中代码和逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//将当前线程封装成SHARDED类型的节点,加入到链表中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//自旋来获取锁,获取前置节点,如果前置节点是头结点,尝试获取锁,将当前节点设置为头节点,开始传播为后置节点解锁
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
/**自旋解锁:唤醒头结点的线程并且开始根据链表顺序传播解锁。
* 因为是公平锁,所以无论现在在哪个线程都会根据链表顺序从head节点开始唤醒
**/
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}

//通独占锁,根据前置节点的情况判断是否应该休眠(前置节点是SINGAL,当前节点的线程会block,如果前置节点是cancel会跳过所有cancel的节点)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private void setHeadAndPropagate(Node node, int propagate) {
//当前的头节点当中间变量同时设置头结点
Node h = head;
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
//propagate>0(这里是state==0),或者头结点为空,或者头结点不是CANCELED状态,或者现在的头节点为空,或者头结点不是CANCELED状态
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;//把当前Node状态之后的节点Shared都唤醒
if (s == null || s.isShared())
doReleaseShared();
}
}

private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//自旋解锁,如果头结点的状态是SIGNAL,因为刚才将当前节点置为了Head,实际就是当前节点,同时一般ws也是SINGAL
if (ws == Node.SIGNAL) {
//cas成功后将头结点的线程解锁
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
//刚才的setHead()方法已经将Thread=null所以,如果修改成功了会跳出循环
continue; // loop on failed CAS
}

//上面修改成功了node是
if (h == head) // loop if head changed
break;
}
}

共享锁释放-tryAcquireShared

CountDownLatch调用countDown()时候回调用sync.releaseShared(1),实际是AQS的releaseShared(int args)方法,如果返回true即state==0,在该方法中会调用doReleaseShared(),获取锁和传播获取锁的流程

CountDownLatch中的Sync的tryReleaseShared:

1
2
3
4
5
6
7
8
9
10
11
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

AQS中的releaseShared代码

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}