java的队列同步器AQS

AbstractQueuedSynchronizer

是java.util.concurrent.locks中最重要的类。翻译过来是“抽象队列同步器”,我们根据字面来理解:

  • Abstract-它是抽象的,业务方根据自己的场景来实现tryAcqurie、tryRelease等方法。例如:CountDownLatch、CyclicBarrier、Semaphore、ReentrantLock等;
  • Queue-它维护了一个FIFO的队列,竞争锁的线程都会加入到这个对列中,排队竞争资源;
  • Synchronizer-它用来保证在并发情况下,线程安全。

优点:
使用方便:满足日常线程调度的需求,通过CountDownLatch、CyclicBarrier、Semaphore,ReentrantLock很轻松实现了共享锁、公平锁等功能。
性能:在jdk1.6之前synchronized关键字很重,性能很不好(1.6之后引入偏向锁、自旋锁、轻量级锁性能已经替身了许多)

主要逻辑

以ReentrantLock为例,来解释加锁和释放锁的逻辑

加锁

调用acquire方法,对业务进行加锁,acquire流程如下:

  1. 会调用tryAcquire(1)尝试获取锁,如果失败将当前线程加入到队列中;
  2. addWatier将当前的线程封装成EXCLUSIVE模式的Node加入到队列中,其中为了避免失败采用了自旋的方法;
  3. 之后通过自旋acquireQueued获取锁资源,然后根据之前记录的线程是否被中断过的状态,进行线程中断,acquireQueued逻辑如下;
    • 获取当前节点的前置节点,如果前置节点是头结点,说明自己可以开始竞争锁资源了,如果成功,将当前节点置为头节点返回;
    • 如果当前节点不是头结点,调用shouldParkAfterFailedAcquire判断该节点是否可以进入休眠状态,分支逻辑如下:
      • 前置节点如果的waitStatus如果是SIGNAL,说明后续节点可以休息返回ture;
      • 如果前置节点的waitStatus是CANCELLED状态(>0),通过循环跳过当前节点前置所有的CANCELLED节点;(之后结合释放锁来看)
      • 否则将前置节点的状态CAS改为SIGNAL,acquireQueued下一次自旋,节点就会返回true;
  4. 如果返回允许挂起后调用parkAndCheckInterrupt(),通过LockSupport.lock()方法挂起当前线程;
  5. 获取锁资源后,判断线程是否被中断过调用:selfInterrupt()方法来中断线程。

涉及的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
独占锁的获取锁方法
调用tryAcquire获取锁,如果失败将当前的节点和state加入到队列中,开始竞争/排队获取锁。
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

//将当前线程封装成node(mode是独占模式)插入到队列末尾
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速插入将node插入到队列的末尾,只插入一次,失败后通过自旋重复插入直到成功
Node pred = tail;
if (pred != null) {
node.prev = pred;
//compareAndSetTail实现上是通过,修改tail字段的值
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//失败后用自旋开始插入
enq(node);
return node;
}

//自旋插入队列尾
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//如果tail为空。初始化队列(tail=head=new Node),然后下次循环将node插入到到tail后面,这时候 new Node()->node
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}


final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//自旋找到头结点,获取锁
for (;;) {
//获取前置节点
final Node p = node.predecessor();
//如果前置节点是头节点,tryAcquire获取锁
if (p == head && tryAcquire(arg)) {
//成功后将当前节点设置为头节点,interrupted=false
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}

//如果失败判断状态,并且挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

//判断是否该挂起线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前置节点的类型
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果前置节点状态为SIGNAL,当前节点可以挂起
return true;
if (ws > 0) {//代码为什么不ws==Node.CANCELLED???
//循环链表,跳过所有前置节点watiStatus是CANCELLED的节点,直到第一个不是CANCELLED的节点。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//等待状态是是Propagated状态,CAS改为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

//
private final boolean parkAndCheckInterrupt() {
//具体实现见下面LockSupport.park详解
LockSupport.park(this);
return Thread.interrupted();
}

释放锁

调用acquire方法,对业务进行加锁,acquire流程如下:

调用release方法多锁进行释放,其中逻辑如下,ps:ReentrantLock的公平锁模式会判断是否有前置节点,如果有才会修改state

  1. 通过调用tryRelease(1)释放锁,成功后取到头head链表的head节点(一般head节点持有的线程都是自己);
  2. 判断head节点的状态,不为空,同时waitStatus!=0,因为在acqureQueued如果有后置节点已经将他改为SIGNAL状态,所以如果为空说明没有后置节点;
  3. 调用unparkSuccessor(h),唤醒后置节点,具体逻辑如下:
    • 获取头节点位置,见头节点位置将头节点waitStatus置为0;
    • 找到后置节点,如果后置节点为空或者是CANCELLED,从尾部开始遍历链表,找到离头节点最近的一个不是CANCELLED状态的节点
    • 通过LockSupport.unpark(s.thread)唤醒线程开始竞争锁;
  4. 这时候这个被唤醒的线程回到acquireQueued的自旋中,走到“p == head && tryAcquire(arg)”这个分支中,即时这时候p!=head,走入shouldParkAfterFailedAcquire方法中,由于这个节点到head之前的节点都赢是CANCELLED对象(见上报从尾部开始遍历链表),所以跳过所有CANCELLED节点,下次必然会p==head,获取锁。

涉及的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

//唤醒后置节点
private void unparkSuccessor(Node node) {
//获取头节点状态,置为0防止重复唤醒
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

//找到后置节点,如果后置节点为空,或者状态是CANCELLED,从尾部开始遍历链表,找到离头节点最近的一个不是CANCELLED状态的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒节点
if (s != null)
LockSupport.unpark(s.thread);
}

总结

AQS利用队列实现了在并发情况下,多线程之间的调度问题;并且合理利用了自旋和挂起笔名了线程之间上下文切换的损耗,和无效的竞争。

其他相关知识

LockSupport对线程阻塞和唤醒(park/unpark)

相比wait和notify的优势

  1. 不用先用synchronized获取同步锁。
  2. LockSupport是基于线程对象的,而wait和notify是基于锁对象的,所以不会遇到遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题

源码如下:

1
2
3
4
5
6
7
8
9
public static void park(Object blocker) {
//获取当前线程,并且设置parkBlocker对象,jstack查看时候就可以知道线程是在哪个对象上阻塞,这里是AQS的实现类
Thread t = Thread.currentThread();
setBlocker(t, blocker);
//阻塞线程,绝对时间,并且无限制的阻塞
UNSAFE.park(false, 0L);
//阻塞停止,将parkBlocker置为空。
setBlocker(t, null);
}

AQS中FIFO队列介绍,内部类Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
static final int PROPAGATE = -3;

volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;

}