Guarded Suspension模式:等待唤醒机制的规范实现

前面提到了在RPC调用中的同步转异步的过程,见Dubbo的DefaultFeture。
我们把这种模式成为Guarded Suspension(守护挂起),又称多线程的if,有一下几个特点

  1. 创建一个GuardedObject包含ID和Response对象,并且在get是否判断Response对象是否为空,如果为空该线程wait住
  2. 在其他的线程对GuardedObject的Response赋值,并且notify。
  3. 主要get和setResponse的要加锁。

具体实现见下面

  1. 通过GuardObject.create创建一个实例
  2. 通过get()方法获取并且阻塞住
  3. 返回时候其他线程通过通过GuardObject.objArraived赋值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
private static Map<String, GuardObject> container = new ConcurrentHashMap<>();
private Lock lock = new ReentrantLock();
private Condition wait = lock.newCondition();

private String id;
private T obj;

private GuardObject(String id) {
this.id = id;
}

/**
* 创建并且放在容器中
* @param id
* @param <T>
* @return
*/
public static <T> GuardObject<T> create(String id) {
GuardObject<T> guardObject = new GuardObject<>(id);
container.put(id, guardObject);
return guardObject;
}

//判断条件
private boolean isArrived() {
return obj != null;
}

//为response赋值
public static <T> void objArraived(String id, T obj) {
GuardObject<T> guardObject = container.get(id);
if (guardObject == null) {
throw new RuntimeException("not exists");
}

guardObject.arrived(obj);
}

//等待通知机制
private void arrived(T obj){
lock.lock();
try {
this.obj = obj;
wait.signalAll();
} finally {
lock.unlock();
}
}



public T get() {
lock.lock();

try {
//最佳实现,没有response等待
while (!isArrived()) {
//等待,增加超时机制
try {
wait.await(10L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}

if (!isArrived()) {
throw new RuntimeException("timeout");
}
}
} finally {
lock.unlock();
}
return obj;
}

public String getId() {
return id;
}

public T getObj() {
return obj;
}