上一篇讲了rocketmq在消费消息时候consumer的流程和原理,本章会将broker做了什么。
ConsumeQueue
这里会涉及到一个关键类是ConsumeQueue,rocketmq为了规避kafka在mq增多时候会导致磁盘IO下降的问题,特有的设计。它是一个逻辑队列,里面保存了CommitLog的index。每次拉取消息broker会根据MessageQueue的Id去里面查找偏移量然后去commitLog拉取消息
它保存的信息格式如下:
其中:消息的起始物理偏移量physical offset(long 8字节)+消息大小size(int 4字节)+tagsCode(long 8字节),每条数据的大小为20个字节,从而每个文件的默认大小为600万个字节。
文件的地址是:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
每个cosumequeue文件的名称fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为600W,当第一个文件满之后创建的第二个文件的名字为00000000000006000000,起始偏移量为6000000,以此类推,第三个文件名字为00000000000012000000,起始偏移量为12000000,消息存储的时候会顺序写入文件,当文件满了,写入下一个文件
代码解析
Broker处理PULL_MESSAGE请求
通过PullMessageProcessor#processRequest来处理拉取消息
1 | private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) |
DefaultMessageStore#getMessage
1 | public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, |
读取索引
ConsumerQueue#getIndexBuffer
1 | public SelectMappedBufferResult getIndexBuffer(final long startIndex) { |
ReputService
在broker启动时会启动改线程,没1ms执行一次负责将CommitLog中的新消息的信息记录cosumeQueue和IndexFile文件中。给出关键代码
1 | class ReputMessageService extends ServiceThread { |
CommitLog#checkMessageAndReturnSize校验消息返回message的校验结果
1 | /** |
回到DefaultMessageStore.this.doDispatch(dispatchRequest)方法,会调用CommitLogDispatcher#dispatch
这里分别是CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex
1 | class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { |
在CommitLogDispatcherBuildConsumeQueue中主要方法是putMessagePostionInfo
1 | public void putMessagePositionInfo(DispatchRequest dispatchRequest) { |
ConsumeQueue#putMessagePositionInfoWrapper中将这条消息的索引持久化到consumeQueue中
1 | public void putMessagePositionInfoWrapper(DispatchRequest request) { |