生产者消费者模式--解耦整个业务

生产者消费者模式的使用场景和作用

  1. 业务解耦:产者负责生产任务丢给队列,消费者负责从队列中获取任务做逻辑
  2. 削峰:串行的业务往往会导致服务夯住,如果采用生产者消费者,消费者负责将业务丢给队列即可返回,消费者可独立扩容多个来负责消费任务

消费者模式的几种实现

批量消费

我们可以从队列中一个个的消费任务,但是在某些场景下我们其实批量的执行任务会更提高效率,比如异步写入数据的逻辑,一条条会建很多链接,我们可以一次拉取N个任务一次执行,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class BatchQueue<T> {

private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();

private int batchSize;

private BatchJob<T> job;

private ExecutorService pool;

public BatchQueue(BatchJob<T> job, int batchSize, int consumerSize) {
this.batchSize = batchSize;
this.job = job;
pool = Executors.newFixedThreadPool(consumerSize);
pool.execute(() -> {
while (true) {
//异步消费
List<T> tasks = pollTask();
job.run(tasks);
}
});
}

public void addTask(T task) {
queue.add(task);
}

private List<T> pollTask() {
List<T> list = new ArrayList<>();
T task = null;
try {
task = queue.take();//减少循环第一次先等待
} catch (InterruptedException e) {
e.printStackTrace();
}

while (task != null && list.size() < batchSize) {

//循环里用poll,满足batchSize或者为空返回
list.add(task);
task = queue.poll();
}

return list;
}

abstract static class BatchJob<T> {
public abstract void run(List<T> task);
}

public static void main(String[] args) {
BatchQueue<Object> queue = new BatchQueue<>(new BatchJob<Object>() {
@Override
public void run(List<Object> task) {
System.out.println(task.size());
System.out.println(task);
}
}, 3, 10);


for (int i = 0; i < 100; i++) {
queue.addTask("i=" + i);
}
}
}

俩阶段提交

之前讨论Mysql的时候,我们提到过Mysql脏页的俩阶段提交,我们也可以使用消费者模式来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();

private int index = 0;
private long ptf = System.currentTimeMillis();

public void addTask(T task) {
queue.add(task);

ExecutorService executorService=Executors.newFixedThreadPool(10);
executorService.execute(()->{
try {
consumer();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

private void consumer() throws InterruptedException {
Object obj = queue.poll(5, TimeUnit.SECONDS);

while (true) {
if (obj != null) {
System.out.println("write");
//write to
}

if (index <= 0) {
return;
}

//级别等于Error or 500条 or >=5秒
if (index == 500 || System.currentTimeMillis() - ptf >= 5000L) {
System.out.println("flush");
index = 0;
ptf = 0L;
}
}
}