JAVA并发案例分析-高性能队列Disruptor的设计

Disruptor的使用方法

  1. 生产消费针对对象event,定义Event
  2. 构造Disruptor时候需要实现一个EventFactory,这里是LongEvent::new
  3. 消费者要注册是一个handleEvent
  4. 生产者要通过publishEvent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 自定义 Event
class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
}
// 指定 RingBuffer 大小,
// 必须是 2 的 N 次方
int bufferSize = 1024;

// 构建 Disruptor
Disruptor<LongEvent> disruptor
= new Disruptor<>(
LongEvent::new,
bufferSize,
DaemonThreadFactory.INSTANCE);

// 注册事件处理器
disruptor.handleEventsWith(
(event, sequence, endOfBatch) ->
System.out.println("E: "+event));

// 启动 Disruptor
disruptor.start();

// 获取 RingBuffer
RingBuffer<LongEvent> ringBuffer
= disruptor.getRingBuffer();
// 生产 Event
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++){
bb.putLong(0, l);
// 生产者生产消息
ringBuffer.publishEvent(
(event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}

Disruptor高效的要点

  1. 数据结构采用RingBuffer并且针对RingBuffer做了如下的优化,初始化时候利用EventFactory的newInstance方法创建所有的元素,
    1. 由于一起创建所以这些元素在内存地址上是连续的,在消费元素时候有效的理由了CPU缓存(程序的局部性)当消费元素a的时候,a+1会加载到CPU的cashe中。
    2. 在生产元素时候,利用setEvent这种方式重用对象,避免重新创建对象频繁的gc
  2. 解决伪缓存的方式:利用内存填充的方式防止变量共享一个缓存行,在无锁的并发情况下导致缓存行重复失效。
  3. CAS的无锁设计高效生产消费队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 生产者获取 n 个写入位置
do {
//cursor 类似于入队索引,指的是上次生产到这里
current = cursor.get();
// 目标是在生产 n 个
next = current + n;
// 减掉一个循环
long wrapPoint = next - bufferSize;
// 获取上一次的最小消费位置
long cachedGatingSequence = gatingSequenceCache.get();
// 没有足够的空余位置
if (wrapPoint>cachedGatingSequence || cachedGatingSequence>current){
// 重新计算所有消费者里面的最小值位置
long gatingSequence = Util.getMinimumSequence(
gatingSequences, current);
// 仍然没有足够的空余位置,出让 CPU 使用权,重新执行下一循环
if (wrapPoint > gatingSequence){
LockSupport.parkNanos(1);
continue;
}
// 从新设置上一次的最小消费位置
gatingSequenceCache.set(gatingSequence);
} else if (cursor.compareAndSet(current, next)){
// 获取写入位置成功,跳出循环
break;
}
} while (true);