消息的生产者类的结构
DefaultMQProducer(TransactionMQProducer后续事务的消费会提到,下面以DefaultMQProducer开始),继承了ClientConfig,实现了接口:MQProducer。
DefaultMQProducer持有DefaultMQProducerImpl对象,DefaultMQProducerImpl实现MQProducerInner接口,
对象初始化
DefaultMQProducer
1 | public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { |
DefaultMQProducerImpl
1 | public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { |
启动
DefaultMQProducer#start(),具体见下面
1 | public void start(final boolean startFactory) throws MQClientException { |
调用MQClientInstance#start(),见下面
1 | public void start() throws MQClientException { |
至此Procuer启动完成
消息发送的过程
- DefaultMQProducer#send–>DefaultMQProducerImpl#send–>DefaultMQProducerImpl#sendOneway、sendDefaultImpl、sendKernelImpl–>MQClientAPIImpl#sendMessage–>nettyRemotingClient#invokeSync、invokeAsync、invokeOneWay
- sendOneway->sendDefaultImpl->sendKernelImpl
重点的类有
- DefaultMQProducerImpl-生产者的实现
- MQClientAPIImpl-MqClientApi的实现,包含生产者和消费者部分
- nettyRemotingClient
DefaultMQProducerImpl
重点方法获取topicPublishInfo–tryToFindTopicPublishInfo
根据message中的topic属性获取本地TopicPublishInfo,如果本地没用去nameServer获取,发送RequestCode为GET_ROUTEINTO_BY_TOPIC的请求获取topicRouteData
1 | private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { |
sendKernelImpl
他们的封装关系:
sendOneway->sendDefaultImpl->sendKernelImpl,这3种方式最终都会调用到sendKernelImpl。
1 | /** |
发送消息-MQClientAPIImpl#sendMessage
MQClientAPIImpl是rocketmq作为client的api实现,这里看发送消息即:sendMessage
1 | /** |
MessageQueueSelector自定义队列选择器实现消息黏着等功能
在上述的三种send方式,默认是RoundRobin,但是我们也可以通过传入MessageQueueSelector接口的实现队列的选择器将一定规则的队列发送到同一个队列中
MessageQueueSelector 接口
1 | public interface MessageQueueSelector { |
DefaultMQProducerImpl#sendSelectImpl
1 | private SendResult sendSelectImpl( |
可以参照代码中的几个示例:SelectMessageQueueByRandom,SelectMessageQueueByMachineRoom,SelectMessageQueueByHash