rocketmq对负载均衡的处理
注册
broker发起
1 | private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, |
调用brokerOutApi的registerBrokerAll方法
1 | public List<RegisterBrokerResult> registerBrokerAll( |
NameServ的处理
在DefaultRequestProcessor中processRequest方法
1 | ...... |
registerBoker方法
1 | public RemotingCommand registerBroker(ChannelHandlerContext ctx, |
RouteInfoManager
broker和topic的路由信息管理者,重要属性如下
1 | private final ReadWriteLock lock = new ReentrantReadWriteLock(); |
调用RouteInfoManager的registerBroker
1 | public RegisterBrokerResult registerBroker( |
关键方法:createAndUpdateQueueData
1 | //注册Broker时候 |
Producer负载均衡
DefaultMQProducerImpl
发送消息和负载均衡相关的逻辑是:
先通过topicPublishInfo选择一个MessageQueue;
调用DefaultMQProducerImpl#sendKernelImpl,在之前发送消息前会调用findBrokerAddressInPublish方法根据mq.getBrokerName来从本地内存获取brokerAddr,见下面代码,如果不存在会调用,tryToFindTopicPublishInfo方法来重新加载brokerAddrTable和topicPublishInfo
1 | private SendResult sendKernelImpl(final Message msg, |
tryToFindTopicPublishInfo方法获取TopicPublishInfo,
1 | private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { |
updateTopicRouteInfoFromNameServer方法中去nameServ获取TopicRouteData,并且将TopicRouteData转成TopicPublishInfo
1 | public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, |
NameServer收到GET_ROUTEINTO_BY_TOPIC请求的处理
DefaultRequestProcessor中的
1 | case RequestCode.GET_ROUTEINTO_BY_TOPIC: |
RouteInfoManager#pickupTopicRouteData
关键方法pickupTopicRouteData,通过上述broker的注册
1 | public TopicRouteData pickupTopicRouteData(final String topic) { |
consumer
todo
TopicPublishInfo和TopicRouteData
todo 补充consumer
TopicRouteData保存整个broker的路由信息,通过TopicRouteData转换成TopicPublishInfo。重要属性如下:
1 | /** |
TopicPublishInfo保存了Producer的队列信息,用于发送消息的负载均衡即选择MessageQueue。重要属性如下
1 | //messageQueues |