RocketMq-源码学习-四-消息的发送

消息的生产者类的结构

DefaultMQProducer(TransactionMQProducer后续事务的消费会提到,下面以DefaultMQProducer开始),继承了ClientConfig,实现了接口:MQProducer。

DefaultMQProducer持有DefaultMQProducerImpl对象,DefaultMQProducerImpl实现MQProducerInner接口,

对象初始化

DefaultMQProducer

1
2
3
4
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

DefaultMQProducerImpl

1
2
3
4
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
}

启动

DefaultMQProducer#start(),具体见下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
//启动时候默认是是这个状态防止重复启动
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;

//检查producerGroup名称合法性
this.checkConfig();

// 判断是否需要设置 InstanceName (不等于CLIENT_INNER_PRODUCER_GROUP,并且InstanceName是default)
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}

//多例模式创建MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

//注册Producer
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}

//topic:"TBW102"--->new TopicPublishInfo() 存入topicPublishInfoTable
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

//启动
if (startFactory) {
mQClientFactory.start();
}

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

调用MQClientInstance#start(),见下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void start() throws MQClientException {

synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
//感觉是服务发现
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel 处理rocktmq的coumser相关的系统事件和CONSUME_MESSAGE_DIRECTLY等信息
this.mQClientAPIImpl.start();
// Start various schedule tasks 服务发现的任务保证服务的发现
this.startScheduledTask();
// Start pull service 启动pullMessageService通过queue拉取线程
this.pullMessageService.start();
// Start rebalance service 每隔一段时间执行一次doReblance方法
this.rebalanceService.start();
// Start push service 启动push服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}

至此Procuer启动完成

消息发送的过程

  1. DefaultMQProducer#send–>DefaultMQProducerImpl#send–>DefaultMQProducerImpl#sendOneway、sendDefaultImpl、sendKernelImpl–>MQClientAPIImpl#sendMessage–>nettyRemotingClient#invokeSync、invokeAsync、invokeOneWay
  2. sendOneway->sendDefaultImpl->sendKernelImpl

重点的类有

  • DefaultMQProducerImpl-生产者的实现
  • MQClientAPIImpl-MqClientApi的实现,包含生产者和消费者部分
  • nettyRemotingClient

DefaultMQProducerImpl

重点方法获取topicPublishInfo–tryToFindTopicPublishInfo

根据message中的topic属性获取本地TopicPublishInfo,如果本地没用去nameServer获取,发送RequestCode为GET_ROUTEINTO_BY_TOPIC的请求获取topicRouteData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//没有或者messageQueueList为空去nameserv获取
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//重点
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}

//mQClientFactory#updateTopicRouteInfoFromNameServer
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
//获取路由信息
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
//发送RequestCode为GET_ROUTEINTO_BY_TOPIC的请求获取topicRouteData
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
......
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
......
//如果发生改变将TopicRouteData转成TopicPublishInfo 和 subscribeInfo 更新
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}

......
return true;
}
} else {
......
}
} catch (Exception e) {
......
} finally {
this.lockNamesrv.unlock();
}
} else {
......
}
} catch (InterruptedException e) {
......
}
return false;
}

sendKernelImpl

他们的封装关系:
sendOneway->sendDefaultImpl->sendKernelImpl,这3种方式最终都会调用到sendKernelImpl。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/**
* @param msg
* @param mq write-message-queue
* @param communicationMode async sync oneway
* @param sendCallback
* @param topicPublishInfo
* @param timeout
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
//得到BrokerAddr,没有在更新topic
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

SendMessageContext context = null;
if (brokerAddr != null) {
//根据配置返回VipChannel
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}

//压缩单体的消息messagebody
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}

//是否事务消息
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

//执行CheckForbiddenHook
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
......
this.executeCheckForbiddenHook(checkForbiddenContext);
}

//执行SendMessageHookBefore
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
......
this.executeSendMessageHookBefore(context);
}

//构建requestHeader
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
//如果是重复消费的次数
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}

//发送消息
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
//如果消息被压缩过,msgbody为原来的消息
// 否则如果这个消息是发送失败的,并且消息体依然大于压缩阈值,被压缩了俩次。
// 所以tmpMessage从msgclone了新对象,原Message不变,如果失败了可以通过原message进行重发
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
//todo FIXME?? 这一步是否有必要要,因为有finnaly
msg.setBody(prevBody);
}
......
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}

//执行hook
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException e) {
......
} catch (MQBrokerException e) {
......
} catch (InterruptedException e) {
......
} finally {
//因为msg.body已经被压缩过了,所以需要重置body
msg.setBody(prevBody);
}
}

throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

发送消息-MQClientAPIImpl#sendMessage

MQClientAPIImpl是rocketmq作为client的api实现,这里看发送消息即:sendMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* 发送消息给Broker,Broker接收后对消息进行存储 RequestCode是SEND_MESSAGE
*
* @param addr broker地址
* @param brokerName broker名称
* @param msg 消息
* @param requestHeader 消息Header头
* @param timeoutMillis 过期时间
* @param communicationMode 发送类型
* @param sendCallback callback记过
* @param topicPublishInfo publishinfo
* @param instance clientInstance
* @param retryTimesWhenSendFailed 失败重试次数
* @param context 上下文
* @param producer producer
* @return
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();

//1.RequestCommand,RequestCode是SEND_MESSAGE,接收者是
RemotingCommand request = null;
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}

// set body
request.setBody(msg.getBody());

switch (communicationMode) {
case ONEWAY:
//单向发送:用nettyRemotingClient的invokeOneway,无返回值
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
//异步发送
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}

return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}

return null;

MessageQueueSelector自定义队列选择器实现消息黏着等功能

在上述的三种send方式,默认是RoundRobin,但是我们也可以通过传入MessageQueueSelector接口的实现队列的选择器将一定规则的队列发送到同一个队列中

MessageQueueSelector 接口

1
2
3
4
5
6
7
8
9
10
public interface MessageQueueSelector {
/**
* 队列选择接口,在sendSelectImpl中调用
* @param mqs topicPublishInfo中的MessageQueues列表
* @param msg message
* @param arg 队列选择器的参数,我们可以依据它来选择队列
* @return
*/
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

DefaultMQProducerImpl#sendSelectImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);

//获取topicPublishInfo
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
//用自定义的selector,从getMessageQueueList选择一个队列
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}

long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
if (mq != null) {
//发送消息
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}

throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

可以参照代码中的几个示例:SelectMessageQueueByRandom,SelectMessageQueueByMachineRoom,SelectMessageQueueByHash