rocketmq的消息的存储模块,broker角色支持ASYNC_MASTER,SYNC_MASTER,SLAVE
broker的主要属性
命令是
mqbroker -m
1 | namesrvAddr= |
BrokerStartup
程序的入口,调用createBrokerController通过配置文件创建BrokerController,调用start方法启动
createBrokerController
1 | public static BrokerController createBrokerController(String[] args) { |
start()方法
1 | public static BrokerController start(BrokerController controller) { |
BrokerController
关键点
- 构造方法 初始化config对象
- initialize 关键成员的初始化工作
- start 相关的对象的start还有想nameServer注册能
- 关键成员 topicConfigManager、consumerOffsetManager、messageStore、以及remotingServer
构造方法
- brokerOuterAPI:nettyClient–用于broker和外部模块沟通,有几个功能:
- (1)和nameserver交互,进行broker节点的注册和取消;
- (2)和其他broker节点交互;
- topicConfigManager
- consumerOffsetManager
- subscriptionGroupManager
- consumerFilterManager
initialize
1 | public boolean initialize() throws CloneNotSupportedException { |
registerProcessor()方法中各processor的对应关系
SendMessageProcessor
RequestCode.SEND_MESSAGE、RequestCode.SEND_MESSAGE_V2、RequestCode.SEND_BATCH_MESSAGE、RequestCode.CONSUMER_SEND_MSG_BACK、
PullMessageProcessor–处理拉取消息的请求
RequestCode.PULL_MESSAGE
QueryMessageProcessor
RequestCode.QUERY_MESSAGE、RequestCode.VIEW_MESSAGE_BY_ID
ClientManageProcessor–处理客户端的请求,如心跳等
RequestCode.HEART_BEAT、RequestCode.UNREGISTER_CLIENT、RequestCode.CHECK_CLIENT_CONFIG
ConsumerManageProcessor
RequestCode.GET_CONSUMER_LIST_BY_GROUP、RequestCode.UPDATE_CONSUMER_OFFSET、RequestCode.QUERY_CONSUMER_OFFSET
EndTransactionProcessor
RequestCode.END_TRANSACTION
AdminBrokerProcessor
registerDefaultProcessor
start
1 | public void start() throws Exception { |
broker心跳机制
- 心跳包发送:
- ClientManageProcessor负责处理producer、consumer、其他broker(topic)发送的心跳包见heartBeat方法
- 清理:
- clientHousekeepingService定期会清理不用的链接