broker的一个重点就是消息的持久化,它决定这MQ的使用场景、性能、以及可用性
存储消息的调用时序
nettyRemotingServer接到请求–>SendMessageProcessor.processRequest–>DefaultMessageStore.putMessage–>CommitLog.putMessage–>MappedFile.appendMessage–>CommitLog.DefaultAppendMessageCallback.doAppend
最终调用MappedFile的–>MappedByteBuffer来写入文件
SendMessageProcessor
上一节提到的broker在启动过程。其中registerProcessor()方法中第一个就初始化了SendMessageProcessor。
SendMessageProcessor采用模板模式,实现了抽象类AbstractSendMessageProcessor,并且实现了NettyRequestProcessor,它是netty的事件的处理类。
NettyRequestProcessor-processRequest
实现了NettyRequestProcessor接口的processRequest发方法
1 |
|
CommitLog
- commitLog的文件管理
- 消息的存储
putMessage
1 | public PutMessageResult putMessage(final MessageExtBrokerInner msg) { |
commitLog文件管理
获取最后一个mappeedFile,如果没有创建一个
AllocateMappedFileService–一次创建俩个文件
由DefaultMessageSotre中初始化,传递个MappedFileQueue,类继承了ServiceThread。创建文件采用生产者消费者模式,一次创建俩个文件,当前文件直接返回。后续文件异步创建,提升了性能
1 | /** |
run方法中的关键
1 | public void run() { |
MappedFile
底层采用javanio的RandomAccessFile和mappedByteBuffer,见init方法
1 | private void init(final String fileName, final int fileSize) throws IOException { |
消息的存储
CommitLog#putMessage–>mapFile#appendMessagesInner–>CommitLog.DefaultAppendMessageCallback#doAppend
MappedFileQueue
putMessage方法存消息
1 | public PutMessageResult putMessage(final MessageExtBrokerInner msg) { |
mappedFile#appendMessage
appendMessage–>appendMessagesInner哎方法中调用AppendMessageCallback.doAppend
1 | public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { |
cb#doAppend
1 | /** |
消息的存储–ConsumerQueue
在DefaultMessageStore.load()和findOffset时候会创建ConsumerQueue。
ConsumerQueue
1 | /** |