liuhao163.github.io

杂七杂八


  • Home

  • Categories

  • Tags

  • Archives

  • Sitemap

RoketMq源码学习-五-消息的消费-下

Posted on 2019-09-15 | Edited on 2022-09-21 | In 消息队列 , rocketmq , 源码学习

上一篇讲了rocketmq在消费消息时候consumer的流程和原理,本章会将broker做了什么。

ConsumeQueue

这里会涉及到一个关键类是ConsumeQueue,rocketmq为了规避kafka在mq增多时候会导致磁盘IO下降的问题,特有的设计。它是一个逻辑队列,里面保存了CommitLog的index。每次拉取消息broker会根据MessageQueue的Id去里面查找偏移量然后去commitLog拉取消息

它保存的信息格式如下:
avatror
其中:消息的起始物理偏移量physical offset(long 8字节)+消息大小size(int 4字节)+tagsCode(long 8字节),每条数据的大小为20个字节,从而每个文件的默认大小为600万个字节。

文件的地址是:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}

每个cosumequeue文件的名称fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为600W,当第一个文件满之后创建的第二个文件的名字为00000000000006000000,起始偏移量为6000000,以此类推,第三个文件名字为00000000000012000000,起始偏移量为12000000,消息存储的时候会顺序写入文件,当文件满了,写入下一个文件

代码解析

Broker处理PULL_MESSAGE请求

通过PullMessageProcessor#processRequest来处理拉取消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
//响应
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
//解析请求头
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
//Reponse setOpaque用于处理响应
response.setOpaque(request.getOpaque());

...... 验证权限

//获取订阅组配置,如果不存在就new一个
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());

......验证订阅组

//见consumer的PullSysFlag.buildSysFlag
final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;

//验证topic权限
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
...验证topic

//验证queueId
if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
......
}

SubscriptionData subscriptionData = null;
ConsumerFilterData consumerFilterData = null;
...... 校验过滤方式
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
this.brokerController.getConsumerFilterManager());
}

//关键代码从MessageStore中读取Message
// 这里是根据请求的QueueId也就是MessageQueue的queueId去查询当前的ConsumerQueue对应的消息(没有则创建一个)
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

if (getMessageResult != null) {
//设置response
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

//设置SuggestWhichBrokerId
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}

switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
break;
case SLAVE:
if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
break;
}

if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
// consume too slow ,redirect to another machine
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
}
// consume ok
else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
}
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}

//处理响应值
switch (getMessageResult.getStatus()) {
case FOUND:
response.setCode(ResponseCode.SUCCESS);
break;
case MESSAGE_WAS_REMOVING:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case NO_MATCHED_LOGIC_QUEUE:
case NO_MESSAGE_IN_QUEUE:
if (0 != requestHeader.getQueueOffset()) {
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
}
break;
case NO_MATCHED_MESSAGE:
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
break;
case OFFSET_FOUND_NULL:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_OVERFLOW_BADLY:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
log.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",
requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());
break;
case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
getMessageResult.getMinOffset(), channel.remoteAddress());
break;
default:
assert false;
break;
}

if (this.hasConsumeMessageHook()) {
.....//执行hook
this.executeConsumeMessageHookBefore(context);
}

switch (response.getCode()) {
case ResponseCode.SUCCESS:
//处理统计信息
this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getMessageCount());

this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
getMessageResult.getBufferTotalSize());

this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final long beginTimeMills = this.brokerController.getMessageStore().now();

//重点代码 getMessageResult读取Message并且写入到reponse中
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
(int) (this.brokerController.getMessageStore().now() - beginTimeMills));
response.setBody(r);
} else {
//zero-copy??
try {
FileRegion fileRegion =
new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
getMessageResult.release();
if (!future.isSuccess()) {
log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());
}
}
});
} catch (Throwable e) {
log.error("transfer many message by pagecache exception", e);
getMessageResult.release();
}

response = null;
}
break;
case ResponseCode.PULL_NOT_FOUND:

if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}

String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}

case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
case ResponseCode.PULL_OFFSET_MOVED:
if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
MessageQueue mq = new MessageQueue();
mq.setTopic(requestHeader.getTopic());
mq.setQueueId(requestHeader.getQueueId());
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());

OffsetMovedEvent event = new OffsetMovedEvent();
event.setConsumerGroup(requestHeader.getConsumerGroup());
event.setMessageQueue(mq);
event.setOffsetRequest(requestHeader.getQueueOffset());
event.setOffsetNew(getMessageResult.getNextBeginOffset());
this.generateOffsetMovedEvent(event);
log.warn(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
responseHeader.getSuggestWhichBrokerId());
} else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
responseHeader.getSuggestWhichBrokerId());
}

break;
default:
assert false;
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store getMessage return null");
}

boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
//记录消费的Offset
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
return response;
}

DefaultMessageStore#getMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
......

long beginTime = this.getSystemClock().now();
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;

GetMessageResult getResult = new GetMessageResult();

//物理队列最大偏移量
final long maxOffsetPy = this.commitLog.getMaxOffset();

//get or create
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
//consumerQueue中消息的最小偏移量和最大偏移量,这里是消息数 校验
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();

if (maxOffset == 0) {
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) {
status = GetMessageStatus.OFFSET_TOO_SMALL;
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) {
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
if (0 == minOffset) {
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else {
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}
} else {
//重点方法,这里的offset是消息的index,代表从这条消息开始读,所以转成真正的offset要*20
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
//标志位
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;

int i = 0;
//一次最大拉取多少条信息这里是自己数,所以是Nums*20()
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//开始读取bufferConsumeQueue步长是20也就是一次读取一条,不超过bufferConsumeQueue的size和设置的最大条数
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
//这就是ConsumerQueue每条消息的长度,
// offsetPy-物理消息偏移量
// sizePy-物理消息长度
// tag的hashCode
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();

maxPhyOffsetPulling = offsetPy;

if (nextPhyFileStartOffset != Long.MIN_VALUE) {
//todo ??
if (offsetPy < nextPhyFileStartOffset)
continue;
}

//检验消息是在磁盘还是内存,这里是maxOffsetPy-offsetPy>memary(这里是总物理内存*使用40%),并且检查是否已满
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
isInDisk)) {
break;
}

//消息过滤
boolean extRet = false, isTagsCodeLegal = true;
if (consumeQueue.isExtAddr(tagsCode)) {
extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
if (extRet) {
tagsCode = cqExtUnit.getTagsCode();
} else {
// can't find ext content.Client will filter messages by tag also.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
tagsCode, offsetPy, sizePy, topic, group);
isTagsCodeLegal = false;
}
}

//从ConsumerQueue过滤消息
if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}

continue;
}

//根据ConsumeQueue从commitLog中读取消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
if (null == selectResult) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.MESSAGE_WAS_REMOVING;
}
//如果没有消息,滚动到下一个文件开始
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
continue;
}

//从commitLog过滤消息
if (messageFilter != null
&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue;
}

//统计信息
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
//添加到结果中
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}

if (diskFallRecorded) {
long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
}

nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {

bufferConsumeQueue.release();
}
} else {
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
+ maxOffset + ", but access logic queue failed.");
}
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}

if (GetMessageStatus.FOUND == status) {
this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
} else {
this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
}
long eclipseTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);

getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}

读取索引
ConsumerQueue#getIndexBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
int mappedFileSize = this.mappedFileSize;
long offset = startIndex * CQ_STORE_UNIT_SIZE;
if (offset >= this.getMinLogicOffset()) {
//通过偏移量找到所在的文件
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
if (mappedFile != null) {
//重点方法 pos=offset % mappedFileSize即当前的偏移量
//fileFromOffset+pos,byteBuffer(position:pos,limit:readPosition - pos),size:readPosition - pos,file:this
SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
return result;
}
}
return null;
}

ReputService

在broker启动时会启动改线程,没1ms执行一次负责将CommitLog中的新消息的信息记录cosumeQueue和IndexFile文件中。给出关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class ReputMessageService extends ServiceThread {

private volatile long reputFromOffset = 0;

private boolean isCommitLogAvailable() {
return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}

private void doReput() {
//doNext and reputFromOffset小于broker的最大的可读的offset即最后一个文件的写活提交的offset
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
......

//当前commit文件reputFromOffset到ReadPostion的的byteBuffer
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
//更新reputFromOffset
this.reputFromOffset = result.getStartOffset();

for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// 生成重放消息重放调度请求
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
// -1-失败,0-到文件尾,1-正常
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
if (size > 0) {
//构建ConsumeQueue
DefaultMessageStore.this.doDispatch(dispatchRequest);

if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
//通知有新的消息
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}

//更新reputFromOffset下次构建从新的地方开始,readSize也增加
this.reputFromOffset += size;
readSize += size;
......
} else if (size == 0) {
//见checkMessageAndReturnSize有可能遇到(msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank)这种情况所以返回0
//这里reputFromOffset重置并且指定到下个文件
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
......
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}

}

CommitLog#checkMessageAndReturnSize校验消息返回message的校验结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/**
* check the message and returns the message size
*
* @return 0 Come the end of the file // >0 Normal messages // -1 Message checksum failure
*/
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
final boolean readBody) {
try {
// 1 TOTAL SIZE
int totalSize = byteBuffer.getInt();

// 2 MAGIC CODE
int magicCode = byteBuffer.getInt();
switch (magicCode) {
case MESSAGE_MAGIC_CODE:
break;
case BLANK_MAGIC_CODE:
//当存储消息的时候发现剩余空间不足一条消息时候就会触发这个结果
return new DispatchRequest(0, true /* success */);
default:
log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));
return new DispatchRequest(-1, false /* success */);
}
// ......
int bodyLen = byteBuffer.getInt();
if (bodyLen > 0) {
if (readBody) {
//......省略body校验
} else {
byteBuffer.position(byteBuffer.position() + bodyLen);
}
}

......
String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);

......

//read properties
......
Map<String, String> propertiesMap = null;
if (propertiesLength > 0) {
......

propertiesMap = MessageDecoder.string2messageProperties(properties);

keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);

uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);

String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);
if (tags != null && tags.length() > 0) {
tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);
}

// Timing message processing
{
// 延时消息将tagsCode设置为时间戳,为什么tagsCode要设置为时间戳后续讲延时消息的时候会分析
String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
int delayLevel = Integer.parseInt(t);

if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
}

if (delayLevel > 0) {
tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
storeTimestamp);
}
}
}
}

int readLength = calMsgLength(bodyLen, topicLen, propertiesLength);
if (totalSize != readLength) {
doNothingForDeadCode(reconsumeTimes);
doNothingForDeadCode(flag);
doNothingForDeadCode(bornTimeStamp);
doNothingForDeadCode(byteBuffer1);
doNothingForDeadCode(byteBuffer2);
log.error(
"[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",
totalSize, readLength, bodyLen, topicLen, propertiesLength);
return new DispatchRequest(totalSize, false/* success */);
}

return new DispatchRequest(
topic,//
queueId,//这条消息的queueId
physicOffset,
totalSize,
tagsCode,
storeTimestamp,
queueOffset,
keys,
uniqKey,
sysFlag,
preparedTransactionOffset,
propertiesMap
);
} catch (Exception e) {
}

return new DispatchRequest(-1, false /* success */);
}

回到DefaultMessageStore.this.doDispatch(dispatchRequest)方法,会调用CommitLogDispatcher#dispatch

这里分别是CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}

//以后讲indexFile时候会讲。大致就是支持随机访问为了性能所以需要IndexFile
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}

在CommitLogDispatcherBuildConsumeQueue中主要方法是putMessagePostionInfo

1
2
3
4
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
cq.putMessagePositionInfoWrapper(dispatchRequest);
}

ConsumeQueue#putMessagePositionInfoWrapper中将这条消息的索引持久化到consumeQueue中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void putMessagePositionInfoWrapper(DispatchRequest request) {
......
for (int i = 0; i < maxRetries && canWrite; i++) {
......
//关键代码
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());

......处理结果
}
......
}

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
//校验
if (offset <= this.maxPhysicOffset) {
return true;
}

//将这条消息的offset size tag写到consumeQueue文件中
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);

final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
......
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}

RoketMq源码学习-五-消息的消费-上

Posted on 2019-09-15 | Edited on 2022-09-21 | In 消息队列 , rocketmq , 源码学习

关于消息的消费分为consumer和broker俩个模块,会分为俩篇文章阐述

消费者按照消费方式的不同分为:

  • MQPushConsumer实现类是:DefaultMQPushConsumer,由broker推送消息给consumer
  • MQPullConsumer实现类是:DefaultMQPullConsumer,定期去borker拉取消息

    消费消息的逻辑通过接口MessageListener,它有俩个实现接口

  • MessageListenerConcurrently:并发的消费

  • MessageListenerOrderly:保序消费,

    消费分为俩种模式MessageModel,即:

  • CLUSTERING:集群,默认

  • BROADCASTING:广播

DefaultMQPushConsumer

类图:

avator

整个流程如下:new DefaultMQPushConsumer()–>subscribe–>start()

create以及重要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Constructor specifying namespace, consumer group, RPC hook and message queue allocating algorithm.
*
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.消费端实现的队列选择器
*/
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}

主要属性

  • defaultMQPushConsumerImpl:内部实现
  • consumerGroup:消费组
  • messageModel:消费方式默认是CLUSTERING、BROADCASTING:广播
  • consumeFromWhere,消费者启动前要设置ConsumeFromWhere,他们的含义是
    • ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET:新消费第一次启动时候从最开始的Offset开始消费
    • ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET:新消费第一次启动时候从最近的Offset开始消费
    • ConsumeFromWhere.CONSUME_FROM_TIMESTAMP::新消费第一次启动时候从最时间戳开始消费
  • consumeTimestamp:默认半小时前和CONSUME_FROM_TIMESTAMP配合使用
  • allocateMessageQueueStrategy:消费者端的队列选择器,消费者从特定的queue消费消息’
  • subscription:tag的过滤器
  • messageListener:负责消息的消费
  • offsetStore:负责存储offset
  • consumeThreadMin和consumeThreadMax:消费消息启动的最小、最大线程数
  • adjustThreadPoolNumsThreshold:当消息堆积到该阈值时候会动态的调整消费线程数
  • consumeConcurrentlyMaxSpan:当队列允许的最大offset的跨度,到达后触发限流。只对并发消费(ConsumeMessageConcurrentlyService)有效
  • pullThresholdForQueue:每条consume queue的消息拉取下来后会缓存到本地,消费结束会删除。当累积达到一个阈值后,会触发该consumerqueue限流
  • pullThresholdForTopic:同上只是这个是针对topic的
  • pullThresholdSizeForQueue和pullThresholdSizeForTopic:限制消息缓存的大小单位是MiB,默认无限制
  • pullInterval:拉取消息队列的间隔,默认是0
  • consumeMessageBatchMaxSize:批量消费消息的条数,messageListener的方法中是个List[<]MessageExt[>] msgs,这个值就是控制msgs的的数量
  • pullBatchSize:去broker拉取消息一次多少条,和上面的属性关系一次拉取pullBatchSize条,分成pullBatchSize/consumeMessageBatchMaxSize个任务去消费 ??
  • postSubscriptionWhenPull:每次拉取的时候是否更新订阅关系
  • unitMode:
  • maxReconsumeTimes:重试次数,达到会进入到私信队列,默认是是-1
    • 并行:16次
    • 串行:无限次。一直等到消费成功后才会笑一次
  • suspendCurrentQueueTimeMillis:串行消费重试的建个
  • consumeTimeout:消费过期时间

subscribe-订阅

通过调用DefaultMQPushConsumerImpl#subscribe方法实现订阅

1
2
3
4
5
6
7
8
9
10
11
12
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}

start

DefaultMQPushConsumer#start()会调用DefaultMQPushConsumerImpl#start(),即:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
//防止重复启动先改状态
this.serviceState = ServiceState.START_FAILED;

//消费者的配置校验
this.checkConfig();

//处理subcription和messageListenerInner
this.copySubscription();

//defaultMQPushConsumer的InstanceName改为PID
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}

//创建MQClient和生产者共用一个类
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

//
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
//默认是AllocateMessageQueueAveragely
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

//初始化拉取消息的Api封装类
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

//初始化OffsetStore,根据消息类型,如果是广播LocalFileOffsetStore,如果是集群消息需要RemoteBrokerOffsetStore
//RemoteBrokerOffsetStore 从broker获取topic的
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
//加载offset
this.offsetStore.load();

//根据消费方式初始化并且启动consumeMessageService todo 重点看
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();

//mqClinetInstance注册消费者到内存中
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}

mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}

/**
* 如果订阅关系存在通过ReblanceImpl获取订阅关系rebalanceImpl#getSubscriptionInner,获取订阅关系
* 如果订阅关系不为空。通过NameServer获取TopicRouteInfo并且更新rebalanceImpl中的topicSubscribeInfoTable
*/
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
//去broker校验Consumer状态,包括consumer的substring的有孝心等
this.mQClientFactory.checkClientInBroker();
//给所有的Broker发送心跳包(code是HEART_BEAT),包括遍历每个消费者生成Consumer信息包括,消费这的类型、消息模式、FromWhere、订阅信息等,
// broker发送HEAT_BEAT请求,完成消费者的注册等工作
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 1.mQClientFactory#rebalanceImmediately唤醒RebalanceService
// 2.RebalanceService#run调用mQClientFactory#doRebalance
// 3.mQClientFactory#doRebalance遍历ConsumerTable(上面registerConsumer方法put的consumer),调用DefaultMQPushConsumerImpl#doRebalance
// 4.DefaultMQPushConsumerImpl#doRebalance调用RebalanceImpl.doRebalance进行遍历
// 5.**还有个重要的目的通过dorebalnce激活pullmessageService的run方法,在死循环中向broker发送pullMessage的消息
this.mQClientFactory.rebalanceImmediately();
}

broker在收到HEAT_BEAT的请求后,会调用heatbeat方法,该方法同时处理prodcuer和consumer的注册,其中consumer的注册是调用this.brokerController#getConsumerManager#registerConsumer的方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
...... //解析协议
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}

//创建RetryTopic:%RETRY%ConsumerGroup,给所有的broker发送信息注册这个topic
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),//默认是1
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}

//broker通过consumerManager注册消费者
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
......
}

/**
* 注册消费者
*
* @param group 消费组
* @param clientChannelInfo 客户端链接的信息
* @param consumeType 消费类型 push or pull
* @param messageModel 消息类型
* @param consumeFromWhere fromWhere
* @param subList 订阅关系的列表,每个consumer的RebalanceImpl#subscriptionInner在注册时候put进去的信息
* @param isNotifyConsumerIdsChangedEnable 是否通知客户端的开关
* @return
*/
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
//get or create ConsumerGroupInfo
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}

//更新clientChannelInfo
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);

//更新SubscriptionData,将remote的更新到本地,并且清理掉已经不存在的SubscriptionData
boolean r2 = consumerGroupInfo.updateSubscription(subList);

if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
//通知所有的消费者,该group订阅关系发送变化 发送NOTIFY_CONSUMER_IDS_CHANGED事件,所有该group下的consumer会立刻做doBalance操作
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}

//consumerIdsChangeListener处理注册事件,调用getConsumerFilterManager().register todo mark 待看
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

return r1 || r2;
}

遗留问题:this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

SubscriptionData

rebalanceImpl

消息的消费

虽然叫做DefaultMQPushConusmerImpl,但是消息的消费实际上是一个pull的过程,那么消息是如何保证实时性的呢?

消息的消费分为拉取消息和消费消息里俩步,拉取消息是PullMessageService类来负责,消费消息ConsumeMessageService来负责

在PullMessageService中是一个用阻塞队列实现的典型的生产者消费者模式实现的。见代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}

@Override
public void run() {
log.info(this.getServiceName() + " service started");

//死循环,从队列中拉取消息
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}

log.info(this.getServiceName() + " service end");
}

其中executePullRequestImmediately就是触发PullRequest的。

  1. 当我们的消费者start()之后会调用mQClientFactory.rebalanceImmediately(),在跟进该方法的过程中发现updateProcessQueueTableInRebalance方法,最终会调用dispatchPullRequest该方法会调用executePullRequestImmediately实际上就触发了消息的拉取
  2. run会调用DefaultMQPushConsumerImpl的pullMessage方法在改方法中通过pullAPIWrapper.pullKernelImpl去broker拉取消息。
  3. 在拉取之后并且在pullCallback中调用ConsumeMessageService消费消息。并且重新调用executePullRequestImmediately重复上述2。

综上,rocketmq就是用这种不停的take–>pull–>add来保证消息的实时性的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
//省略校验操作
......
final long beginTimestamp = System.currentTimeMillis();
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);

//FOUND NO_NEW_MSG NO_MATCHED_MSG OFFSET_ILLEGAL都会将pullRequest重新放入到pullRequestQueue对列中,
//然后PullMessageService的run()方法在进行下一次的拉取。
//PullMessageService的run方法是个死循环,通过pullRequestQueue.take来保证实时性。
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);

long firstMsgOffset = Long.MAX_VALUE;
//如果msgFoundList为空
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
//
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//消息的消费逻辑
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);

//根据设置进行下一次的拉取任务
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}

if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}

break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

//
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

//
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:

log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

pullRequest.getProcessQueue().setDropped(true);
//OFFSET_ILLEGAL 在10s后会通过rebalanceImpl#removeProcessQueue摘除messageQueue
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);

DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}

@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}

String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}

classFilter = sd.isClassFilterMode();
}

int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);

//重点:想broker发送PULL_MESSAGE的请求拉取消息,拉取后将response转成PullResultExt然后经过pullCallback处理拉取结果
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}

拉取消息在consumer测的过程

  1. pullAPIWrapper.pullKernelImpl会调用MQClientAPIImpl#pullMessage方法给broker发送RequestCode.PULL_MESSAGE请求
  2. 发送方式默认是ASYNC,所以在pullMessageAsync方法中会处理将response转成PullResultExt对象,这时候MsgList是空。
    • 通过调用 processPullResponse()方法来转换
  3. 之后在InvokeCallback回调中调用pullCallback完成消费

pullMessageAsync具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void pullMessageAsync(
final String addr,
final RemotingCommand request,//pullmessage请求
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {
//pullCallback.onException...省略
}
}
});
}

processPullResponse具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private PullResult processPullResponse(
final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
PullStatus pullStatus = PullStatus.NO_NEW_MSG;
//根据repsonse转换code
switch (response.getCode()) {
case ResponseCode.SUCCESS:
pullStatus = PullStatus.FOUND;
break;
case ResponseCode.PULL_NOT_FOUND:
pullStatus = PullStatus.NO_NEW_MSG;
break;
case ResponseCode.PULL_RETRY_IMMEDIATELY:
pullStatus = PullStatus.NO_MATCHED_MSG;
break;
case ResponseCode.PULL_OFFSET_MOVED:
pullStatus = PullStatus.OFFSET_ILLEGAL;
break;

default:
throw new MQBrokerException(response.getCode(), response.getRemark());
}

PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);

return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}

consumer消费消息

拉取到消息后,在PullCallBack中会调用consumeMessageService.submitConsumeRequest来消费消息,实际上是封装成ConsumeRequest在线程池中批量消费消息,具体见下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class ConsumeRequest implements Runnable {
private final List<MessageExt> msgs;
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;

public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
this.msgs = msgs;
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}

public List<MessageExt> getMsgs() {
return msgs;
}

public ProcessQueue getProcessQueue() {
return processQueue;
}

@Override
public void run() {
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;

ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}

long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}

//自定义的消费逻辑
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}

//执行consumer的hook
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}

if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}

ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

if (!processQueue.isDropped()) {
//处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}

public MessageQueue getMessageQueue() {
return messageQueue;
}

}

顺序消费

和Currently类似,不过首先对消息进行排序,首先将消息放入到processQueue的红黑树中,另外在消费时候先加锁,所粒度是MessageQueue,保证每个MessageQueue的顺序是一致的。如代码:

由于consumer只能做到单messageQueue消息有序,我们在producer侧生产消息时候应该按照我们的保序字段selectQueue才行,保证相同id的都打到一个messageQueue中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;

//ConsumeRequest只是面向processQueue和messageQueue的
//这里设计的很不错,由于顺序消费针对MessageQueue所以单Queue的消费是线性的,为了提高吞吐,采用了ProcessQueue镜像队列,用红黑树的思想对消息offset排序然后压进队列,ConsumeRequest不停的消费ProcessQueue里弹出来的消息
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}

public ProcessQueue getProcessQueue() {
return processQueue;
}

public MessageQueue getMessageQueue() {
return messageQueue;
}

@Override
public void run() {
//......

//关键代码锁机制,锁粒度是messageQueue,一个MessageQueue对应一个锁,也就是单一的messageQueue消费有序,需要发送时候选择messageQueue
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
//todo 大哥你用个while不行么!!!一直从processQueue中弹出来BatchSize个消息,进行消费知道空为止
//因为,不同的pullMessage会不停的给processQueue中压值知道满足阈值为止。
for (boolean continueConsume = true; continueConsume; ) {
//....省略校验逻辑

final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

//从processQueue的红黑树按照顺序take出consumeBatchSize个消息
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

ConsumeOrderlyStatus status = null;

ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}

long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
//保证线程安全,保证每个消息消费的线程安全
this.processQueue.getLockConsume().lock();
//.....
//消费逻辑
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
//......
hasException = true;
} finally {
//解锁
this.processQueue.getLockConsume().unlock();
}

// 处理消费结果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
// ......
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}

}

由于顺序消费为每个MessageQueue要加锁,为了创建大量的无用线程,在processQueue#putMessage方法会判断consuming的状态,如果当前processQueue已经在消费中了,消息只是放入红黑树但是不会启动线程去消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 将Messages放到红黑树中
* @param msgs message信息
* @return 如果msgTreeMap不为空,且当前不在消费返回true,为true启动线程开始消费,如果消费中就不在启动线程了防止高并发情况下大量的异步线程将线程池打满
*/
public boolean putMessage(final List<MessageExt> msgs) {
boolean dispatchToConsume = false;
try {
//锁因为msgTreeMap不是线程安全的
this.lockTreeMap.writeLock().lockInterruptibly();
try {
int validMsgCnt = 0;
for (MessageExt msg : msgs) {
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
if (null == old) {
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
msgSize.addAndGet(msg.getBody().length);
}
}
msgCount.addAndGet(validMsgCnt);

//message不为空,并且当前没有consuming.
// 因为顺序消费是启动一个线程一直processQueue中弹Message到processQueue为空
// 所以当前红黑树不为空,并且不在消费中返回true,起线程开始消费,如果消费中就不在启动线程了防止高并发情况下大量的异步线程将线程池打满
if (!msgTreeMap.isEmpty() && !this.consuming) {
dispatchToConsume = true;
this.consuming = true;
}

if (!msgs.isEmpty()) {
MessageExt messageExt = msgs.get(msgs.size() - 1);
String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
if (property != null) {
long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
if (accTotal > 0) {
this.msgAccCnt = accTotal;
}
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}

return dispatchToConsume;
}

处理消费请求

ConsumeMessageConcurrentlyService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();

if (consumeRequest.getMsgs().isEmpty())
return;

switch (status) {
case CONSUME_SUCCESS:
//ackIndex取consumeRequest.getMsgs()最后的一个元素小表
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;//size
int failed = consumeRequest.getMsgs().size() - ok;//0
//统计
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
//遍历message列表,发送RequestCode.CONSUMER_SEND_MSG_BACK请求给broker,如果有异常,加入到msgBackFailed
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

//消费请求中去掉发送CONSUMER_SEND_MSG_BACK的消息,然后对这些发送msgBack失败的消息在消费一遍。
//这里就是为什么我们程序要做幂等行的判断
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);

this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}

//更新内存中的offset
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

orderly和上述类似不过有一个提交过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) {
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) {
switch (status) {
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
consumeRequest.getMessageQueue());
case SUCCESS:
//提交
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
}
} else {
switch (status) {
case SUCCESS:
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case COMMIT:
commitOffset = consumeRequest.getProcessQueue().commit();
break;
case ROLLBACK:
consumeRequest.getProcessQueue().rollback();
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
}
break;
default:
break;
}
}

if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}

return continueConsume;
}

RocketMq-源码学习-四-消息的发送

Posted on 2019-09-14 | Edited on 2022-09-21 | In 消息队列 , rocketmq , 源码学习

消息的生产者类的结构

DefaultMQProducer(TransactionMQProducer后续事务的消费会提到,下面以DefaultMQProducer开始),继承了ClientConfig,实现了接口:MQProducer。

DefaultMQProducer持有DefaultMQProducerImpl对象,DefaultMQProducerImpl实现MQProducerInner接口,

对象初始化

DefaultMQProducer

1
2
3
4
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

DefaultMQProducerImpl

1
2
3
4
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
}

启动

DefaultMQProducer#start(),具体见下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
//启动时候默认是是这个状态防止重复启动
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;

//检查producerGroup名称合法性
this.checkConfig();

// 判断是否需要设置 InstanceName (不等于CLIENT_INNER_PRODUCER_GROUP,并且InstanceName是default)
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}

//多例模式创建MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

//注册Producer
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}

//topic:"TBW102"--->new TopicPublishInfo() 存入topicPublishInfoTable
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

//启动
if (startFactory) {
mQClientFactory.start();
}

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

调用MQClientInstance#start(),见下面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void start() throws MQClientException {

synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
//感觉是服务发现
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel 处理rocktmq的coumser相关的系统事件和CONSUME_MESSAGE_DIRECTLY等信息
this.mQClientAPIImpl.start();
// Start various schedule tasks 服务发现的任务保证服务的发现
this.startScheduledTask();
// Start pull service 启动pullMessageService通过queue拉取线程
this.pullMessageService.start();
// Start rebalance service 每隔一段时间执行一次doReblance方法
this.rebalanceService.start();
// Start push service 启动push服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}

至此Procuer启动完成

消息发送的过程

  1. DefaultMQProducer#send–>DefaultMQProducerImpl#send–>DefaultMQProducerImpl#sendOneway、sendDefaultImpl、sendKernelImpl–>MQClientAPIImpl#sendMessage–>nettyRemotingClient#invokeSync、invokeAsync、invokeOneWay
  2. sendOneway->sendDefaultImpl->sendKernelImpl

重点的类有

  • DefaultMQProducerImpl-生产者的实现
  • MQClientAPIImpl-MqClientApi的实现,包含生产者和消费者部分
  • nettyRemotingClient

DefaultMQProducerImpl

重点方法获取topicPublishInfo–tryToFindTopicPublishInfo

根据message中的topic属性获取本地TopicPublishInfo,如果本地没用去nameServer获取,发送RequestCode为GET_ROUTEINTO_BY_TOPIC的请求获取topicRouteData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//没有或者messageQueueList为空去nameserv获取
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//重点
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}

//mQClientFactory#updateTopicRouteInfoFromNameServer
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
//获取路由信息
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
//发送RequestCode为GET_ROUTEINTO_BY_TOPIC的请求获取topicRouteData
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
......
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
......
//如果发生改变将TopicRouteData转成TopicPublishInfo 和 subscribeInfo 更新
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}

......
return true;
}
} else {
......
}
} catch (Exception e) {
......
} finally {
this.lockNamesrv.unlock();
}
} else {
......
}
} catch (InterruptedException e) {
......
}
return false;
}

sendKernelImpl

他们的封装关系:
sendOneway->sendDefaultImpl->sendKernelImpl,这3种方式最终都会调用到sendKernelImpl。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/**
* @param msg
* @param mq write-message-queue
* @param communicationMode async sync oneway
* @param sendCallback
* @param topicPublishInfo
* @param timeout
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
//得到BrokerAddr,没有在更新topic
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

SendMessageContext context = null;
if (brokerAddr != null) {
//根据配置返回VipChannel
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}

//压缩单体的消息messagebody
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}

//是否事务消息
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}

//执行CheckForbiddenHook
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
......
this.executeCheckForbiddenHook(checkForbiddenContext);
}

//执行SendMessageHookBefore
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
......
this.executeSendMessageHookBefore(context);
}

//构建requestHeader
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
//如果是重复消费的次数
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}

//发送消息
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
Message tmpMessage = msg;
//如果消息被压缩过,msgbody为原来的消息
// 否则如果这个消息是发送失败的,并且消息体依然大于压缩阈值,被压缩了俩次。
// 所以tmpMessage从msgclone了新对象,原Message不变,如果失败了可以通过原message进行重发
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
//todo FIXME?? 这一步是否有必要要,因为有finnaly
msg.setBody(prevBody);
}
......
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}

//执行hook
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException e) {
......
} catch (MQBrokerException e) {
......
} catch (InterruptedException e) {
......
} finally {
//因为msg.body已经被压缩过了,所以需要重置body
msg.setBody(prevBody);
}
}

throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

发送消息-MQClientAPIImpl#sendMessage

MQClientAPIImpl是rocketmq作为client的api实现,这里看发送消息即:sendMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* 发送消息给Broker,Broker接收后对消息进行存储 RequestCode是SEND_MESSAGE
*
* @param addr broker地址
* @param brokerName broker名称
* @param msg 消息
* @param requestHeader 消息Header头
* @param timeoutMillis 过期时间
* @param communicationMode 发送类型
* @param sendCallback callback记过
* @param topicPublishInfo publishinfo
* @param instance clientInstance
* @param retryTimesWhenSendFailed 失败重试次数
* @param context 上下文
* @param producer producer
* @return
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
public SendResult sendMessage(
final String addr,
final String brokerName,
final Message msg,
final SendMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();

//1.RequestCommand,RequestCode是SEND_MESSAGE,接收者是
RemotingCommand request = null;
if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
}

// set body
request.setBody(msg.getBody());

switch (communicationMode) {
case ONEWAY:
//单向发送:用nettyRemotingClient的invokeOneway,无返回值
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
//异步发送
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}

return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}

return null;

MessageQueueSelector自定义队列选择器实现消息黏着等功能

在上述的三种send方式,默认是RoundRobin,但是我们也可以通过传入MessageQueueSelector接口的实现队列的选择器将一定规则的队列发送到同一个队列中

MessageQueueSelector 接口

1
2
3
4
5
6
7
8
9
10
public interface MessageQueueSelector {
/**
* 队列选择接口,在sendSelectImpl中调用
* @param mqs topicPublishInfo中的MessageQueues列表
* @param msg message
* @param arg 队列选择器的参数,我们可以依据它来选择队列
* @return
*/
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

DefaultMQProducerImpl#sendSelectImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);

//获取topicPublishInfo
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
//用自定义的selector,从getMessageQueueList选择一个队列
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}

long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
if (mq != null) {
//发送消息
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}

throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

可以参照代码中的几个示例:SelectMessageQueueByRandom,SelectMessageQueueByMachineRoom,SelectMessageQueueByHash

RocketMq-源码学习-三-Store模块消息的存储(**持久化消息队列的重点**)

Posted on 2019-09-13 | Edited on 2022-09-21 | In 消息队列 , rocketmq , 源码学习

broker的一个重点就是消息的持久化,它决定这MQ的使用场景、性能、以及可用性

存储消息的调用时序

nettyRemotingServer接到请求–>SendMessageProcessor.processRequest–>DefaultMessageStore.putMessage–>CommitLog.putMessage–>MappedFile.appendMessage–>CommitLog.DefaultAppendMessageCallback.doAppend
最终调用MappedFile的–>MappedByteBuffer来写入文件

SendMessageProcessor

上一节提到的broker在启动过程。其中registerProcessor()方法中第一个就初始化了SendMessageProcessor。
SendMessageProcessor采用模板模式,实现了抽象类AbstractSendMessageProcessor,并且实现了NettyRequestProcessor,它是netty的事件的处理类。

NettyRequestProcessor-processRequest

实现了NettyRequestProcessor接口的processRequest发方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}

mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

RemotingCommand response;
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}

this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}

CommitLog

  • commitLog的文件管理
  • 消息的存储

putMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
......
//获取消息的事物类型
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//延时消息
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId 将消息设置成topic
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}

......
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

putMessageLock.lock();
try {

......

//获取mappedFile 如果mappedFile或者为空,createAndGet
if (null == mappedFile || mappedFile.isFull()) {
//getOrCreate
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
}

......

result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}

eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}

if (eclipseTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}

if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);

return putMessageResult;
}

commitLog文件管理

获取最后一个mappeedFile,如果没有创建一个

AllocateMappedFileService–一次创建俩个文件

由DefaultMessageSotre中初始化,传递个MappedFileQueue,类继承了ServiceThread。创建文件采用生产者消费者模式,一次创建俩个文件,当前文件直接返回。后续文件异步创建,提升了性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* 生产者消费者模式。同步的放文件到队列中,线程中异步的去创建文件,
* @param nextFilePath
* @param nextNextFilePath
* @param fileSize
* @return
*/
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {

......
//nextFile的异步请求
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
if (nextPutOK) {
......
//丢到队列中
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}

//nextNextFile的异步请求
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}

//一次创建俩个文件nextFile和nextNextFile,nextFile异步转同步返回,nextNextFile下一次创建时候可以直接到这里,
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
this.requestTable.remove(nextFilePath);
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}

return null;
}

run方法中的关键

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void run() {
log.info(this.getServiceName() + " service started");
//关键方法
while (!this.isStopped() && this.mmapOperation()) {

}
log.info(this.getServiceName() + " service end");
}

private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
//获取请求
req = this.requestQueue.take();
//校验请求状态
......

if (req.getMappedFile() == null) {
......
//俩种方式TransientStorePool和MappedFileByteBuffer
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
// pre write mappedFile
......
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
......
return false;//被中断返回false
} catch (IOException e) {
......
} finally {
if (req != null && isSuccess)
req.getCountDownLatch().countDown();
//完成
}
return true;
}

MappedFile

底层采用javanio的RandomAccessFile和mappedByteBuffer,见init方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;

ensureDirOK(this.file.getParent());

try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
throw e;
} catch (IOException e) {
log.error("map file " + this.fileName + " Failed. ", e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}

消息的存储

CommitLog#putMessage–>mapFile#appendMessagesInner–>CommitLog.DefaultAppendMessageCallback#doAppend

MappedFileQueue

putMessage方法存消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
...校验消息以及设置消息里面的属性...
//获取消息的事物类型
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}

//延时消息
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId 将消息设置成topic
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);
}
}

putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
......
//获取mappedFile 如果mappedFile或者为空,createAndGet
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
......
//mappedFile的put
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
//文件不够发送该消息,创建个新文件,并且重发
unlockMappedFile = mappedFile;
//因为上一条消息woritePosition改变,已经满了,所以会已新的哦createPosition创建文件
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}

eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}

if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
......
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
......
return putMessageResult;
}

mappedFile#appendMessage

appendMessage–>appendMessagesInner哎方法中调用AppendMessageCallback.doAppend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;

//写的position,如果还有空余
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {
//缓冲区切片,获取子缓冲区,position=0,起始位置是主缓冲区的position,容量是limit-position看,这里是指写缓冲区中,剩下的缓冲区。
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
//设置当前消息写的position
byteBuffer.position(currentPos);
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) {
//文件开始位置,写的bytebuffer,剩余位置,消息体
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

//
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

cb#doAppend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/**
*
* @param fileFromOffset 文件其实的自己数1G的整数倍
* @param byteBuffer mappedByteBuffer的子缓冲区
* @param maxBlank 空余byteslength
* @param msgInner 消息体
* @return
*/
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner) {

// PHY OFFSET =文件起始的位置+当前写的位置
long wroteOffset = fileFromOffset + byteBuffer.position();

//计算msgId
//msgIdMemory+16位 MSG_ID_LENGTH=8+8具体见下面
//msgInner.getStoreHostBytes(hostHolder)总共8位=Addr数组+port
//wroteOffset long类型占8位
this.resetByteBuffer(hostHolder, 8);
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString(); // topic-queueid
//get queue current offset
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset) {
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
}

// Transaction messages that require special handling 处理事物消息待看
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}

//获取properties
final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
if (propertiesLength > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long. length={}", propertiesData.length);
return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
}
//获取topic信息
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

//计算消息的长度
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

// Exceeds the maximum message 默认是512k
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
+ ", maxMessageSize: " + this.maxMessageSize);
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
}

// Determines whether there is sufficient free space 文件内容不足以写消息返回END_OF_FILE类型的result
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}

// Initialization of storage space 初始化读取空间先flip在设置limit,写消息
......
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
//处理事务
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);
break;
default:
break;
}
return result;
}

消息的存储–ConsumerQueue

在DefaultMessageStore.load()和findOffset时候会创建ConsumerQueue。

ConsumerQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 创建新的逻辑队列ConsumeQueue和ConsumeQueueExt
*
* @param topic topic
* @param queueId queueId
* @param storePath storePathRootDir/consumerqueue/topic/queueId
* storePathRootDir/consumerqueu_ext/topic/queueId
* @param mappedFileSize 大小是30W消息*每条20字节
* @param defaultMessageStore
*/
public ConsumeQueue(
final String topic,
final int queueId,
final String storePath,
final int mappedFileSize,
final DefaultMessageStore defaultMessageStore) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.defaultMessageStore = defaultMessageStore;

this.topic = topic;
this.queueId = queueId;

String queueDir = this.storePath
+ File.separator + topic
+ File.separator + queueId;

this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);

this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);

if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {
this.consumeQueueExt = new ConsumeQueueExt(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
);
}
}

RocketMq-源码学习-二-Nameserv

Posted on 2019-09-12 | Edited on 2022-09-21 | In 消息队列 , rocketmq , 源码学习

nameserv是服务发现中心,底层采用netty监听

NamesrvStartup

服务的启动类,通过命令行参数初始化NamesrvController,并且启动NamesrvController。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();

Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}

//初始化nameservconfig和netteyServerConfig
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();

...

//关键代码,创建NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}

start方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static NamesrvController start(final NamesrvController controller) throws Exception {
......

//初始化配置
boolean initResult = controller.initialize();

......

Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));

//关键代码start
controller.start();

return controller;
}

NamesrvController

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
//nameserver的配置
this.namesrvConfig = namesrvConfig;
//netty相关的配置
this.nettyServerConfig = nettyServerConfig;
//kv存储的管理类
this.kvConfigManager = new KVConfigManager(this);
//broker topic的信息管理
this.routeInfoManager = new RouteInfoManager();
//broker心跳的类,是ChannelEventListener的实现类,Netty的事件触发后处理事件的详细的类,具体调用见NettyConnectManageHandler
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
//storePath的保存
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

initialize

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public boolean initialize() {

//kv设置load到内存
this.kvConfigManager.load();

//核心:创建nettyserver
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

//远程命令处理类,具体调用在NettyRemotingAbstract的processMessageReceived里
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();

//扫描未活跃的broker并且移除他们每10秒一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);

//打印所有kvstore的kv
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);

//tls--监控tls的文件状态如果修改了重新reload--一个私有方法reloadServerSslContext
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
......见源码
);
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}

return true;
}

start

1
2
3
4
5
6
7
8
9
public void start() throws Exception {
//start servbice
this.remotingServer.start();

//监听ssl的变化,如果有变化remotingServer.loadSslContext()
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}

remotingServer–NettyRemotingServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});

ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)//bossgruop和workgroup
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)//listerner的长度,过小会导致tcp链接简历失败
.option(ChannelOption.SO_REUSEADDR, true)//服务器端口可被重用
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)//禁用Nagle算法,有效提高网络负载
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())//发送缓冲区大小
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())//接受缓冲区大小
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))//设置端口
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//注意:使用第三个EventGroup,defaultEventExecutorGroup,建立链接、读写任务分开
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),//MessageToByteEncoder
new NettyDecoder(),//LengthFieldBasedFrameDecoder具体见netty,解决粘包问题
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//心跳检测机制
new NettyConnectManageHandler(),//处理io链接相关的事件注意啊channelActive,channelInActive,userEventTriggered(心跳),exceptionCaught
new NettyServerHandler()//处理request和response信息
);
}
});

if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
//重用缓冲区
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}

try {
//监听端口
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}

//netty的事件处理类
// 1.nettyEventExecutor是一个线程,run里是死循环,nettyEventExecutor维护了个阻塞队列,采用生产者消费者模式。putNettEvent放事件,线程异步消费事件
// 2.事件来了后,通过NettyRemotingServer.this.channelEventListener方法类处理事件
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}

this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}

RocketMq-源码学习-一-Broker

Posted on 2019-09-11 | Edited on 2022-09-21 | In 消息队列 , rocketmq , 源码学习

rocketmq的消息的存储模块,broker角色支持ASYNC_MASTER,SYNC_MASTER,SLAVE

broker的主要属性

命令是
mqbroker -m

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
namesrvAddr=
brokerIP1=10.96.83.21
brokerName=localhost
brokerClusterName=DefaultCluster
brokerId=0
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
traceTopicEnable=false
rejectTransactionMessage=false
fetchNamesrvAddrByAddressServer=false
transactionTimeOut=6000
transactionCheckMax=15
transactionCheckInterval=60000
aclEnable=false
storePathRootDir=/root/store
storePathCommitLog=/root/store/commitlog
flushIntervalCommitLog=500
commitIntervalCommitLog=200
flushCommitLogTimed=false
deleteWhen=04
fileReservedTime=72
maxTransferBytesOnMessageInMemory=262144
maxTransferCountOnMessageInMemory=32
maxTransferBytesOnMessageInDisk=65536
maxTransferCountOnMessageInDisk=8
accessMessageInMemoryMaxRatio=40
messageIndexEnable=true
messageIndexSafe=false
haMasterAddress=
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
cleanFileForciblyEnable=true
transientStorePoolEnable=false

BrokerStartup

程序的入口,调用createBrokerController通过配置文件创建BrokerController,调用start方法启动

createBrokerController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public static BrokerController createBrokerController(String[] args) {
......
try {
......
//broker既是netty的server端又是客户端
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();

nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
//默认监听10911端口
nettyServerConfig.setListenPort(10911);
//比较重要,消息存储的配置,Ha端口监听10911,即10911+1 //见HMessageStoreConfig中的AService
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
......//初始化配置并且进行配置检查

//初始化BrokerController
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);

//重要BrokerController的一些初始化操作
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);

@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));

return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}

start()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static BrokerController start(BrokerController controller) {
try {

controller.start();

String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();

if (null != controller.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
}

log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}

BrokerController

关键点

  • 构造方法 初始化config对象
  • initialize 关键成员的初始化工作
  • start 相关的对象的start还有想nameServer注册能
  • 关键成员 topicConfigManager、consumerOffsetManager、messageStore、以及remotingServer

构造方法

  • brokerOuterAPI:nettyClient–用于broker和外部模块沟通,有几个功能:
    • (1)和nameserver交互,进行broker节点的注册和取消;
    • (2)和其他broker节点交互;
  • topicConfigManager
  • consumerOffsetManager
  • subscriptionGroupManager
  • consumerFilterManager

initialize

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
public boolean initialize() throws CloneNotSupportedException {
//分别从/root/store/config/下的topic.json、consumerOffset.json、consumerFilter.json

//存储每个topic的读写队列数、权限、是否顺序等信息
boolean result = this.topicConfigManager.load();

//存储每个消费者Consumer在每个topic上对于该topic的consumequeue队列的消费进度;
result = result && this.consumerOffsetManager.load();
//存储每个消费者Consumer的订阅信息。
result = result && this.subscriptionGroupManager.load();
//存储消费者过滤器的信息
result = result && this.consumerFilterManager.load();

if (result) {
try {
//todo 消息持久化的方法
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
//AbstractPluginMessageStore
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
//初始化messageStore
result = result && this.messageStore.load();

if (result) {
//初始化broker的server
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();

//todo *****不知道做什么的和remotingServer功能高度重合*****
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));

this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));

this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));

this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));

this.clientManageExecutor = new ThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));

this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true));

this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));

this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));

//注册netty-channelRead事件处理类
this.registerProcessor();

final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
//统计broker put和get的消息,一天一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);

//持久化offSet信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

//持久化consumerFilter信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

//熔断机制,将消费异常的消费组踢下去?防止某些consumer消息堆积
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);

//打印水位信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);

//获取已经持计划到到commitlog,但是还没有被消费的日志的byte大小
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

//得到nameServAddr
if (this.brokerConfig.getNamesrvAddr() != null) {
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
//和nameServAddr的心跳机制
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}

//slave角色特有的操作
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}

//定期从主同步Broker信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {
//主会打印主从差异
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}

if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;

@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}

private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
((NettyRemotingServer) fastRemotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
initialTransaction();
}
return result;
}

registerProcessor()方法中各processor的对应关系

SendMessageProcessor

RequestCode.SEND_MESSAGE、RequestCode.SEND_MESSAGE_V2、RequestCode.SEND_BATCH_MESSAGE、RequestCode.CONSUMER_SEND_MSG_BACK、

PullMessageProcessor–处理拉取消息的请求

RequestCode.PULL_MESSAGE

QueryMessageProcessor

RequestCode.QUERY_MESSAGE、RequestCode.VIEW_MESSAGE_BY_ID

ClientManageProcessor–处理客户端的请求,如心跳等

RequestCode.HEART_BEAT、RequestCode.UNREGISTER_CLIENT、RequestCode.CHECK_CLIENT_CONFIG

ConsumerManageProcessor

RequestCode.GET_CONSUMER_LIST_BY_GROUP、RequestCode.UPDATE_CONSUMER_OFFSET、RequestCode.QUERY_CONSUMER_OFFSET

EndTransactionProcessor

RequestCode.END_TRANSACTION

AdminBrokerProcessor

registerDefaultProcessor

start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public void start() throws Exception {
//message保存模块 重要
if (this.messageStore != null) {
this.messageStore.start();
}

//netty的server端broker监听的端口 重要
if (this.remotingServer != null) {
this.remotingServer.start();
}

//netty的server端 remotingServer端口-2 不知道有什么用
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}

//文件监听类,和ssl有关不重要
if (this.fileWatchService != null) {
this.fileWatchService.start();
}

//nettyclient对外和nameserver以及其他broker交互
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}

//处理ResponseCode.PULL_NOT_FOUND等异常状态的消息
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}

//处理系统服务服务
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}

//启动filterServer
if (this.filterServerManager != null) {
this.filterServerManager.start();
}

//向nameserv发送数据进行broker的注册,注意非单向的equestCode.QUERY_DATA_VERSION,
this.registerBrokerAll(true, false, true);

//每隔一段时间注册一下broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}

//快速返回失败...开启之后每隔10ms会清理过去的request请求,见代码start
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}

//主的broker开始事物检查
if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {
if (this.transactionalMessageCheckService != null) {
log.info("Start transaction service!");
//run方法里的死循环wait一段时间,然后onWaitEnd中check()
this.transactionalMessageCheckService.start();
}
}
}

broker心跳机制

  • 心跳包发送:
    • ClientManageProcessor负责处理producer、consumer、其他broker(topic)发送的心跳包见heartBeat方法
  • 清理:
    • clientHousekeepingService定期会清理不用的链接

RocketMq-源码学习-零-提前的一些准备

Posted on 2019-09-01 | Edited on 2022-09-21 | In 消息队列 , rocketmq , 源码学习

看源码的一些准备工作,以下逻辑会出现多次。

RemotingServer

rocketMq采用NettyRemotingServer来处理各个组件之间的网络通信。

处理消息的方式

NettyRemotingServer的channelRead0()–>NettyServerHandler.processMessageReceived–>NettyServerHandler.processRequestCommand的processorTable处理,其中processorTable的方法是通过下面的方式注册进去

  • 注册消息的:通过registerProcessor方法,每种消息根据requestcode,来制定消息处理方式。
    • 维护了processorTable对象,HashMap<Integer/ request code /, Pair<NettyRequestProcessor, ExecutorService>>
    • 每一种request指定一个ExecutorService和一个registerProcessor来处理
  • 处理消息:通过调用processMessageReceived来处理。

NettyClient三种发送消息的方式

见信息的发送

invokeOneway:单向发送无返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
//get or create Channel
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}

public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
request.markOnewayRPC();
//限流
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
//释放
once.release();
......
}
});
} catch (Exception e) {
......
once.release();
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
......
throw new RemotingTimeoutException(info);
}
}
}

invokeAsync:异步发送方式,invokeCallback中处理返回值信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@Override
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call timeout");
}
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
} catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}

public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final int opaque = request.getOpaque();

//限流
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
}

//responseFuture每个请求创建一个responseFuture,在responseFuture调用invokeCallback
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
/**
* 标准的rpc异步处理流程
* 1.这里是讲requestId-->responseFuture放到responseTable
* 2.当server处理完发送请求之后发送回客户端由processResponseCommand方法处理,responseTable通过repsonse的Id,取到对应的responseFuture
* 3.invokeCallback来处理后续操作
*/
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
//处理失败的请求
requestFail(opaque);
......
}
});
} catch (Exception e) {
responseFuture.release();
......
}
} else {
......
异常处理
}
}

invokeSync:同步发送,等待返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
......超时
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
return response;
} catch (RemotingSendRequestException e) {
......
} catch (RemotingTimeoutException e) {
......
}
} else {
......
}
}

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();

try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}

responseTable.remove(opaque);
responseFuture.setCause(f.cause());
//发送消息成功到了通知responseFuture
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});

//async是直接交给processResponseCommand就返回了,这里是同步转异步等待一定时间
// 1.在processResponseCommand可以主动终止,发送ok
// 2.ChannelFutureListener失败,可以主动终止,失败
// 3.超时
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}

return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}

系统优化-CPU-理解CPU的平均负载

Posted on 2019-08-28 | Edited on 2022-09-21 | In 性能优化 , CPU

本章主要讲解如何理解CPU的平均负载、以及常见的引起CPU平均负载升高的几种场景。

什么是CPU的平均负载

指单位时间内系统可运行(Runnable)以及不可中断(Uninterruptible Sleep)的平均进程数。其中不可中断是指系统对进程和设备的一种保护,比如读磁盘的文件为了保证系统的一致性这个进程不可中断 ps命令中是D

如何查看平均负载呢:

常用命令有:uptime|top|w,我们1以uptime为例,如:

1
2
$ uptime
02:34:03 up 2 days, 20:14, 1 user, load average: 0.63, 0.83, 0.88

其中:load average: 0.63, 0.83, 0.88 分别代表了最近1分钟、5分钟、15分钟的cpu平均负载,什么是laod avarage呢

平均负载反映了这段时间操作系统的CPU负载的趋势,一般来讲,一个健康的系统的平均负载不应该超过0.7。

引起平均负载升高的常见的三种场景

和平均负载相关的另一项数据是CPU的使用率,他们俩的关系根据场景有如下的联系

  1. 计算密集型:cpu使用率和cpu负载会同时升高
  2. I/O密集型:大量的线程处于i/o-wait,这时候cpu负载高但是cpu使用率并不高
  3. 当需要调度的进程数量大于cpu的时候,cpu负载和使用率会同时升高。

    我们怎么确定上述的3中场景呢?这时候我们可以用到如下的工具:

  • stress 压测工具
  • sysstat linux性能分析工具,常用mpstat和pidstat

模拟计算密集型

见下图:

mpstat的结果显示这时候看%iowait不高,但是%user已经100%了
pidstat的结果显示是stress引起的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
stress --cpu 1 --timeout 600 #一个核打满

#查看系统性能
# -P ALL 表示监控所有 CPU,后面数字 5 表示间隔 5 秒后输出一组数据
$ mpstat -P ALL 5
Linux 4.15.0 (ubuntu) 09/22/18 _x86_64_ (2 CPU)
13:30:06 CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle
13:30:11 all 50.05 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 49.95
13:30:11 0 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 100.00
13:30:11 1 100.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00

#分析由于iowait为0,user很高说明是CPU计算导致的,于是我们
# 间隔 5 秒后输出一组数据
$ pidstat -u 5 1
13:37:07 UID PID %usr %system %guest %wait %CPU CPU Command
13:37:12 0 2962 100.00 0.00 0.00 0.00 100.00 1 stress

模拟IO密集型

见下图:

mpstat的结果显示这时候看%iowait很高67.53,但是%user才0.43
pidstat的结果显示是stress引起的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#发送sync
$ stress -i 1 --timeout 600

#查看系统性能
# 显示所有 CPU 的指标,并在间隔 5 秒输出一组数据
$ mpstat -P ALL 5 1
Linux 4.15.0 (ubuntu) 09/22/18 _x86_64_ (2 CPU)
13:41:28 CPU %usr %nice %sys %iowait %irq %soft %steal %guest %gnice %idle
13:41:33 all 0.21 0.00 12.07 32.67 0.00 0.21 0.00 0.00 0.00 54.84
13:41:33 0 0.43 0.00 23.87 67.53 0.00 0.43 0.00 0.00 0.00 7.74
13:41:33 1 0.00 0.00 0.81 0.20 0.00 0.00 0.00 0.00 0.00 98.99

#分析使用率不高但是iowait很高
# 间隔 5 秒后输出一组数据,-u 表示 CPU 指标
$ pidstat -u 5 1
Linux 4.15.0 (ubuntu) 09/22/18 _x86_64_ (2 CPU)
13:42:08 UID PID %usr %system %guest %wait %CPU CPU Command
13:42:13 0 104 0.00 3.39 0.00 0.00 3.39 1 kworker/1:1H
13:42:13 0 109 0.00 0.40 0.00 0.00 0.40 0 kworker/0:1H
13:42:13 0 2997 2.00 35.53 0.00 3.99 37.52 1 stress
13:42:13 0 3057 0.00 0.40 0.00 0.00 0.40 0 pidstat

#发现stress导致的

大量CPU调度

pidstat的结果显示%wait的结果很高,%iowait和cpu都不高

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#发送sync
$ stress -c 8 --timeout 600

#查看系统性能
# 间隔 5 秒后输出一组数据
$ pidstat -u 5 1
14:23:25 UID PID %usr %system %guest %wait %CPU CPU Command
14:23:30 0 3190 25.00 0.00 0.00 74.80 25.00 0 stress
14:23:30 0 3191 25.00 0.00 0.00 75.20 25.00 0 stress
14:23:30 0 3192 25.00 0.00 0.00 74.80 25.00 1 stress
14:23:30 0 3193 25.00 0.00 0.00 75.00 25.00 1 stress
14:23:30 0 3194 24.80 0.00 0.00 74.60 24.80 0 stress
14:23:30 0 3195 24.80 0.00 0.00 75.00 24.80 0 stress
14:23:30 0 3196 24.80 0.00 0.00 74.60 24.80 1 stress
14:23:30 0 3197 24.80 0.00 0.00 74.80 24.80 1 stress
14:23:30 0 3200 0.00 0.20 0.00 0.20 0.20 0 pidstat

#发现stress导致的

总结

  1. 计算密集型:mpstat如果%user很高,但是%iowait不高,再用pidstat查看进程的%wait如果也不高。
  2. I/O密集型:mpstat如果%user不高,但是%iowait很高,再用pidstat查看进程的%wait如果也不高。
  3. 大量的进程需要调度:mpstat如果%user不高,但是%iowait不高,再用pidstat查看进程的%wait如果很高。

RockeMq-初始以及demo的注解

Posted on 2019-08-25 | Edited on 2022-09-21 | In 消息队列 , rocketmq

producer

send-one-way

发送后不会检查发送状态,方法返回值void,常用于log的收集

1
producer.sendOneway(msg)

send-sync

同步发送场景,发送后会返回sendResult用于检查发送状态,适合于订单等场景

1
producer.sendOneway(msg)

send-async

异步发送场景,安全性介于上述两者之间,会有发送结果但是在SendCallback回调中处理

1
2
3
4
5
6
7
8
9
10
11
producer.send(msg,new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("producer sendResult="+sendResult);
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", -1, e);
e.printStackTrace();
}
});

consumer

保续消费

客户端:send(Message msg, MessageQueueSelector selector, Object arg)

  • 实现MessageQueueSelector

消费者

  • MessageListenerOrderly:保续消费

ack机制

  • ConsumeConcurrentlyStatus.CONSUME_SUCCESS:消费成功
  • ConsumeConcurrentlyStatus.RECONSUME_LATER:重复消费。rocketmq会放到重试队列,这个重试TOPIC的名字是%RETRY%+consumergroup的名字

注意:

  1. 如果业务的回调没有处理好而抛出异常,会认为是消费失败当ConsumeConcurrentlyStatus.RECONSUME_LATER处理。
  2. 当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有ConsumeConcurrentlyStatus.RECONSUME_LATER的这个状态,只有ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。

offset的控制

和kafka类似的机制。

todo

  1. 后续补充对offset的理解
  2. 保续消费的实现

消息队列基础

Posted on 2019-08-19 | Edited on 2022-09-21 | In 消息队列

at-least-once:如果producer收到来自Kafka broker的确认(ack)或者acks = all,则表示该消息已经写入到Kafka。但如果producer ack超时或收到错误,则可能会重试发送消息,客户端会认为该消息未写入Kafka。如果broker在发送Ack之前失败,但在消息成功写入Kafka之后,此重试将导致该消息被写入两次,因此消息会被不止一次地传递给最终consumer,这种策略可能导致重复的工作和不正确的结果。

at-most-once:如果在ack超时或返回错误时producer不重试,则该消息可能最终不会写入Kafka,因此不会传递给consumer。在大多数情况下,这样做是为了避免重复的可能性,业务上必须接收数据传递可能的丢失。

exactly-once:即使producer重试发送消息,消息也会保证最多一次地传递给最终consumer。该语义是最理想的,但也难以实现,这是因为它需要消息系统本身与生产和消费消息的应用程序进行协作。例如如果在消费消息成功后,将Kafka consumer的偏移量rollback,我们将会再次从该偏移量开始接收消息。这表明消息传递系统和客户端应用程序必须配合调整才能实现excactly-once。

1…111213…23

Liu hao

励志当好厨子的程序员

229 posts
54 categories
81 tags
RSS
GitHub E-Mail
© 2018 – 2023 Liu hao
Powered by Hexo v3.9.0
|
Theme – NexT.Pisces v7.0.0