RoketMq源码学习-八-延迟队列

在业务中我们发送定时消息等会用到,延迟队列,rocketmq内置了该功能,帮我实现这种延迟队列。实现很简单

实现方式、原理

  1. 生成者发送消息时候为消息设置setDelayTimeLevel,mq不能自己指定延时时间而只能采用系统设置好的level
  2. broker在putmessage时候会将message的real-topic备份,然后将message放到SCHEDULE_TOPIC_XXXX这个topic中,根据delayLevel制定不同的queueId。
  3. ScheduleMessageService会启动timer定期扫描各个delayLevel的Queue,已经到执行时间的message在将SCHEDULE_TOPIC_XXXX的message移到real-topic中。

缺点:由于是一个timer在扫描SCHEDULE_TOPIC_XXXX下所有queue的消息,如果消息积压过多于可能造成消息的发送延迟。可以考虑每个level用一个线程取扫描(不过也要考虑线程切换的成本导致频繁切换反而降低性能)

关键代码

producer

producer发送消息,设置level

1
2
3
public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}

broker

broker处理putMessage

  1. CommitLog.putMessage保存信息会将真实的topic的备份放到指定队列
  2. DefaultMessageStore的ReputMessage在写CounmeQueue的时候会计算DeliverTime并且写入到tagCode中

见代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
......
//消息不是事物类型
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery Message
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}

//延时消息--将消息投递到
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId 将消息设置成topic
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
......

return true;
}

//在ConsumeQueue存储消息信息时候会将tagCode设置为发送的日期
private void doReput() {
......
// 生成重放消息重放调度请求,-1-失败,0-到文件尾,1-正常
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
......
}

public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
final boolean readBody) {
......
// Timing message processing 延时消息将tagsCode设置为时间戳,
{
String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
int delayLevel = Integer.parseInt(t);

if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
}

//计算DeliverTimestamp
if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
storeTimestamp);
}
}
}
......
}

ScheduleMessageService

level的个数的设置由MessageStoreConfig的messageDelayLevel决定。rocketmq只能按照这个设置来决定消息的延迟时间。

1
messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  1. 初始化:BrokerController在initialize()时候初始化DefaultMessageStore,DefaultMessageStore会初始化ScheduleMessageService,ScheduleMessageService负责定时任务的check和调度。
  2. load加载配置:BrokerController在initialize()会调用DefaultMessageStore的load,在该方法中会调用ScheduleMessageService的load,关键代码如下
  3. 启动:在BrokerController.start后,随着DefaultMessageStore的start而启动。

    加载配置的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

//DefaultMessageStore的关键代码
public boolean load() {
......
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
......
}

//ScheduleMessageService的代码
public boolean load() {
boolean result = super.load();
//初始化配置的DelayLevel
result = result && this.parseDelayLevel();
return result;
}

public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);

String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
//1h-->ch=h,tu=1000L * 60 * 60
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);

//初始化maxDelayLevel
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
//1h-->1
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
//put levelIndex,1*1000L * 60 * 60
this.delayLevelTable.put(level, delayTimeMillis);
}
} catch (Exception e) {
log.error("parseDelayLevel exception", e);
log.info("levelString String = {}", levelString);
return false;
}

return true;
}

start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    public void start() {
//遍历delayLevelTable,找到每个level的offset,然后异步的启动DeliverDelayedMessageTimerTask去检查
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}

if (timeDelay != null) {
//关键代码,每个level和offset会创建一个DeliverDelayedMessageTimerTask,第一次FIRST_DELAY_TIME(1s后执行)
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}

this.timer.scheduleAtFixedRate(new TimerTask() {

@Override
public void run() {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
  • DeliverDelayedMessageTimerTask没有采用for/while循环这种来保证扫描delayQueue的实时性,而是每次根据处理Messagge的结果在启动一
  • DeliverDelayedMessageTimerTask来控制频次、和保证实时性。在DeliverDelayedMessageTimerTask的run方法中调用executeOnTimeup这个是主要逻辑
  • 由于延期消息是队列,所以相同粒度的延期信息一定是按照顺序写入到队列中的。所以如果当前消息没到发布时间,后面所有消息就都没到发布时间。具体见下面代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
public void executeOnTimeup() {
//根据delayLevel找到制定的consumerQueue
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));

long failScheduleOffset = offset;
if (cq != null) {
//步骤1,在consumerQueue中根据偏移量找到这条消息的索引信息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
//获取cqExtUnit
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
//tagsCode-在conumeQueue持久化时候已经变味了DeliverTimestamp
long tagsCode = bufferCQ.getByteBuffer().getLong();

if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}

//步骤2,计算过期时间
long now = System.currentTimeMillis();
//修正deliverTimestamp,如果deliverTimestamp>now+delayMills说明过期了
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

long countdown = deliverTimestamp - now;
if (countdown <= 0) {
//到期了开始投递消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);

if (msgExt != null) {
try {
//修改realTopic,发送消息
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);

//发送成功后,继续处理下一条消息
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
//发送失败,10sec后在开始下一轮扫描
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
//消息没到期,创建Task扫描一次该消息,延期countdown防止无谓的计算
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for

//队列所有消息都扫描结束后,100L后开启下一轮扫描
//由于延期消息是队列,所以相同粒度的延期信息一定是按照顺序写入到队列中的。所以如果当前消息没到发布时间,后面所有消息就都没到发布时间。具体见下面代码
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {

bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {

long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}