Semaphore:如何构建限流器

信号量模型

如图:

avatar

简单来说就是一个队列,一个计数器,三个方法

init()–初始化队列和信号量
down()–信号量-1如果<0,当前线程阻塞,并且进入队列等待。
up()–信号量+1如果>=0,则唤醒一个线程别且把它从当前队列中剔除。

在java当中的Semaphore的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
 static int count;
// 初始化信号量
static final Semaphore s
= new Semaphore(1);
// 用信号量保证互斥
static void addOne() {
s.acquire();
try {
count+=1;
} finally {
s.release();
}
}

信号量是如何保证原子性的呢?
俩个线程都执行s.acquire()是原子性的会修改计数器,假设线程1先执行,所以计数器是0,线程2为-1,因为线程2小于0所以线程2阻塞,并且将当前线程加入到队列中线程1大于等于0所以线程1执行count+=1;这时候线程1执行s.release();计数器为0,大于等于0,所以唤醒队列中的一个线程。

Semaphore的作用

和Lock相比可以同时允许多个线程同时访问临界区。下面的代码是事先了一个队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore sem;
// 构造函数
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 利用对象池的对象,调用 func
R exec(Function<T,R> func) {
T t = null;
sem.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
pool.add(t);
sem.release();
}
}
}
// 创建对象池
ObjPool<Long, String> pool =
new ObjPool<Long, String>(10, 2);
// 通过对象池获取 t,之后执行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});