在业务中我们发送定时消息等会用到,延迟队列,rocketmq内置了该功能,帮我实现这种延迟队列。实现很简单
实现方式、原理
- 生成者发送消息时候为消息设置setDelayTimeLevel,mq不能自己指定延时时间而只能采用系统设置好的level
- broker在putmessage时候会将message的real-topic备份,然后将message放到SCHEDULE_TOPIC_XXXX这个topic中,根据delayLevel制定不同的queueId。
- ScheduleMessageService会启动timer定期扫描各个delayLevel的Queue,已经到执行时间的message在将SCHEDULE_TOPIC_XXXX的message移到real-topic中。
缺点:由于是一个timer在扫描SCHEDULE_TOPIC_XXXX下所有queue的消息,如果消息积压过多于可能造成消息的发送延迟。可以考虑每个level用一个线程取扫描(不过也要考虑线程切换的成本导致频繁切换反而降低性能)
关键代码
producer
producer发送消息,设置level
1 | public void setDelayTimeLevel(int level) { |
broker
broker处理putMessage
- CommitLog.putMessage保存信息会将真实的topic的备份放到指定队列
- DefaultMessageStore的ReputMessage在写CounmeQueue的时候会计算DeliverTime并且写入到tagCode中
见代码
1 | public PutMessageResult putMessage(final MessageExtBrokerInner msg) { |
ScheduleMessageService
level的个数的设置由MessageStoreConfig的messageDelayLevel决定。rocketmq只能按照这个设置来决定消息的延迟时间。
1 | messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; |
- 初始化:BrokerController在initialize()时候初始化DefaultMessageStore,DefaultMessageStore会初始化ScheduleMessageService,ScheduleMessageService负责定时任务的check和调度。
- load加载配置:BrokerController在initialize()会调用DefaultMessageStore的load,在该方法中会调用ScheduleMessageService的load,关键代码如下
启动:在BrokerController.start后,随着DefaultMessageStore的start而启动。
加载配置的过程
1 |
|
start
1 | public void start() { |
- DeliverDelayedMessageTimerTask没有采用for/while循环这种来保证扫描delayQueue的实时性,而是每次根据处理Messagge的结果在启动一
- DeliverDelayedMessageTimerTask来控制频次、和保证实时性。在DeliverDelayedMessageTimerTask的run方法中调用executeOnTimeup这个是主要逻辑
- 由于延期消息是队列,所以相同粒度的延期信息一定是按照顺序写入到队列中的。所以如果当前消息没到发布时间,后面所有消息就都没到发布时间。具体见下面代码
1 | public void executeOnTimeup() { |