RoketMq源码学习-五-消息的消费-上

关于消息的消费分为consumer和broker俩个模块,会分为俩篇文章阐述

消费者按照消费方式的不同分为:

  • MQPushConsumer实现类是:DefaultMQPushConsumer,由broker推送消息给consumer
  • MQPullConsumer实现类是:DefaultMQPullConsumer,定期去borker拉取消息

    消费消息的逻辑通过接口MessageListener,它有俩个实现接口

  • MessageListenerConcurrently:并发的消费

  • MessageListenerOrderly:保序消费,

    消费分为俩种模式MessageModel,即:

  • CLUSTERING:集群,默认

  • BROADCASTING:广播

DefaultMQPushConsumer

类图:

avator

整个流程如下:new DefaultMQPushConsumer()–>subscribe–>start()

create以及重要属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Constructor specifying namespace, consumer group, RPC hook and message queue allocating algorithm.
*
* @param namespace Namespace for this MQ Producer instance.
* @param consumerGroup Consume queue.
* @param rpcHook RPC hook to execute before each remoting command.
* @param allocateMessageQueueStrategy Message queue allocating algorithm.消费端实现的队列选择器
*/
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}

主要属性

  • defaultMQPushConsumerImpl:内部实现
  • consumerGroup:消费组
  • messageModel:消费方式默认是CLUSTERING、BROADCASTING:广播
  • consumeFromWhere,消费者启动前要设置ConsumeFromWhere,他们的含义是
    • ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET:新消费第一次启动时候从最开始的Offset开始消费
    • ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET:新消费第一次启动时候从最近的Offset开始消费
    • ConsumeFromWhere.CONSUME_FROM_TIMESTAMP::新消费第一次启动时候从最时间戳开始消费
  • consumeTimestamp:默认半小时前和CONSUME_FROM_TIMESTAMP配合使用
  • allocateMessageQueueStrategy:消费者端的队列选择器,消费者从特定的queue消费消息’
  • subscription:tag的过滤器
  • messageListener:负责消息的消费
  • offsetStore:负责存储offset
  • consumeThreadMin和consumeThreadMax:消费消息启动的最小、最大线程数
  • adjustThreadPoolNumsThreshold:当消息堆积到该阈值时候会动态的调整消费线程数
  • consumeConcurrentlyMaxSpan:当队列允许的最大offset的跨度,到达后触发限流。只对并发消费(ConsumeMessageConcurrentlyService)有效
  • pullThresholdForQueue:每条consume queue的消息拉取下来后会缓存到本地,消费结束会删除。当累积达到一个阈值后,会触发该consumerqueue限流
  • pullThresholdForTopic:同上只是这个是针对topic的
  • pullThresholdSizeForQueue和pullThresholdSizeForTopic:限制消息缓存的大小单位是MiB,默认无限制
  • pullInterval:拉取消息队列的间隔,默认是0
  • consumeMessageBatchMaxSize:批量消费消息的条数,messageListener的方法中是个List[<]MessageExt[>] msgs,这个值就是控制msgs的的数量
  • pullBatchSize:去broker拉取消息一次多少条,和上面的属性关系一次拉取pullBatchSize条,分成pullBatchSize/consumeMessageBatchMaxSize个任务去消费 ??
  • postSubscriptionWhenPull:每次拉取的时候是否更新订阅关系
  • unitMode:
  • maxReconsumeTimes:重试次数,达到会进入到私信队列,默认是是-1
    • 并行:16次
    • 串行:无限次。一直等到消费成功后才会笑一次
  • suspendCurrentQueueTimeMillis:串行消费重试的建个
  • consumeTimeout:消费过期时间

subscribe-订阅

通过调用DefaultMQPushConsumerImpl#subscribe方法实现订阅

1
2
3
4
5
6
7
8
9
10
11
12
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}

start

DefaultMQPushConsumer#start()会调用DefaultMQPushConsumerImpl#start(),即:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
//防止重复启动先改状态
this.serviceState = ServiceState.START_FAILED;

//消费者的配置校验
this.checkConfig();

//处理subcription和messageListenerInner
this.copySubscription();

//defaultMQPushConsumer的InstanceName改为PID
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}

//创建MQClient和生产者共用一个类
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

//
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
//默认是AllocateMessageQueueAveragely
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

//初始化拉取消息的Api封装类
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

//初始化OffsetStore,根据消息类型,如果是广播LocalFileOffsetStore,如果是集群消息需要RemoteBrokerOffsetStore
//RemoteBrokerOffsetStore 从broker获取topic的
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
//加载offset
this.offsetStore.load();

//根据消费方式初始化并且启动consumeMessageService todo 重点看
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();

//mqClinetInstance注册消费者到内存中
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}

mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}

/**
* 如果订阅关系存在通过ReblanceImpl获取订阅关系rebalanceImpl#getSubscriptionInner,获取订阅关系
* 如果订阅关系不为空。通过NameServer获取TopicRouteInfo并且更新rebalanceImpl中的topicSubscribeInfoTable
*/
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
//去broker校验Consumer状态,包括consumer的substring的有孝心等
this.mQClientFactory.checkClientInBroker();
//给所有的Broker发送心跳包(code是HEART_BEAT),包括遍历每个消费者生成Consumer信息包括,消费这的类型、消息模式、FromWhere、订阅信息等,
// broker发送HEAT_BEAT请求,完成消费者的注册等工作
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 1.mQClientFactory#rebalanceImmediately唤醒RebalanceService
// 2.RebalanceService#run调用mQClientFactory#doRebalance
// 3.mQClientFactory#doRebalance遍历ConsumerTable(上面registerConsumer方法put的consumer),调用DefaultMQPushConsumerImpl#doRebalance
// 4.DefaultMQPushConsumerImpl#doRebalance调用RebalanceImpl.doRebalance进行遍历
// 5.**还有个重要的目的通过dorebalnce激活pullmessageService的run方法,在死循环中向broker发送pullMessage的消息
this.mQClientFactory.rebalanceImmediately();
}

broker在收到HEAT_BEAT的请求后,会调用heatbeat方法,该方法同时处理prodcuer和consumer的注册,其中consumer的注册是调用this.brokerController#getConsumerManager#registerConsumer的方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
...... //解析协议
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}

//创建RetryTopic:%RETRY%ConsumerGroup,给所有的broker发送信息注册这个topic
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),//默认是1
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}

//broker通过consumerManager注册消费者
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
......
}

/**
* 注册消费者
*
* @param group 消费组
* @param clientChannelInfo 客户端链接的信息
* @param consumeType 消费类型 push or pull
* @param messageModel 消息类型
* @param consumeFromWhere fromWhere
* @param subList 订阅关系的列表,每个consumer的RebalanceImpl#subscriptionInner在注册时候put进去的信息
* @param isNotifyConsumerIdsChangedEnable 是否通知客户端的开关
* @return
*/
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
//get or create ConsumerGroupInfo
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}

//更新clientChannelInfo
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);

//更新SubscriptionData,将remote的更新到本地,并且清理掉已经不存在的SubscriptionData
boolean r2 = consumerGroupInfo.updateSubscription(subList);

if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
//通知所有的消费者,该group订阅关系发送变化 发送NOTIFY_CONSUMER_IDS_CHANGED事件,所有该group下的consumer会立刻做doBalance操作
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}

//consumerIdsChangeListener处理注册事件,调用getConsumerFilterManager().register todo mark 待看
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

return r1 || r2;
}

遗留问题:this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

SubscriptionData

rebalanceImpl

消息的消费

虽然叫做DefaultMQPushConusmerImpl,但是消息的消费实际上是一个pull的过程,那么消息是如何保证实时性的呢?

消息的消费分为拉取消息和消费消息里俩步,拉取消息是PullMessageService类来负责,消费消息ConsumeMessageService来负责

在PullMessageService中是一个用阻塞队列实现的典型的生产者消费者模式实现的。见代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}

@Override
public void run() {
log.info(this.getServiceName() + " service started");

//死循环,从队列中拉取消息
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}

log.info(this.getServiceName() + " service end");
}

其中executePullRequestImmediately就是触发PullRequest的。

  1. 当我们的消费者start()之后会调用mQClientFactory.rebalanceImmediately(),在跟进该方法的过程中发现updateProcessQueueTableInRebalance方法,最终会调用dispatchPullRequest该方法会调用executePullRequestImmediately实际上就触发了消息的拉取
  2. run会调用DefaultMQPushConsumerImpl的pullMessage方法在改方法中通过pullAPIWrapper.pullKernelImpl去broker拉取消息。
  3. 在拉取之后并且在pullCallback中调用ConsumeMessageService消费消息。并且重新调用executePullRequestImmediately重复上述2。

综上,rocketmq就是用这种不停的take–>pull–>add来保证消息的实时性的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
//省略校验操作
......
final long beginTimestamp = System.currentTimeMillis();
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);

//FOUND NO_NEW_MSG NO_MATCHED_MSG OFFSET_ILLEGAL都会将pullRequest重新放入到pullRequestQueue对列中,
//然后PullMessageService的run()方法在进行下一次的拉取。
//PullMessageService的run方法是个死循环,通过pullRequestQueue.take来保证实时性。
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);

long firstMsgOffset = Long.MAX_VALUE;
//如果msgFoundList为空
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
//
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//消息的消费逻辑
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);

//根据设置进行下一次的拉取任务
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}

if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}

break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

//
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

//
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:

log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());

pullRequest.getProcessQueue().setDropped(true);
//OFFSET_ILLEGAL 在10s后会通过rebalanceImpl#removeProcessQueue摘除messageQueue
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);

DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}

@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}

DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}

String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}

classFilter = sd.isClassFilterMode();
}

int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);

//重点:想broker发送PULL_MESSAGE的请求拉取消息,拉取后将response转成PullResultExt然后经过pullCallback处理拉取结果
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}

拉取消息在consumer测的过程

  1. pullAPIWrapper.pullKernelImpl会调用MQClientAPIImpl#pullMessage方法给broker发送RequestCode.PULL_MESSAGE请求
  2. 发送方式默认是ASYNC,所以在pullMessageAsync方法中会处理将response转成PullResultExt对象,这时候MsgList是空。
    • 通过调用 processPullResponse()方法来转换
  3. 之后在InvokeCallback回调中调用pullCallback完成消费

pullMessageAsync具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void pullMessageAsync(
final String addr,
final RemotingCommand request,//pullmessage请求
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {
//pullCallback.onException...省略
}
}
});
}

processPullResponse具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private PullResult processPullResponse(
final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
PullStatus pullStatus = PullStatus.NO_NEW_MSG;
//根据repsonse转换code
switch (response.getCode()) {
case ResponseCode.SUCCESS:
pullStatus = PullStatus.FOUND;
break;
case ResponseCode.PULL_NOT_FOUND:
pullStatus = PullStatus.NO_NEW_MSG;
break;
case ResponseCode.PULL_RETRY_IMMEDIATELY:
pullStatus = PullStatus.NO_MATCHED_MSG;
break;
case ResponseCode.PULL_OFFSET_MOVED:
pullStatus = PullStatus.OFFSET_ILLEGAL;
break;

default:
throw new MQBrokerException(response.getCode(), response.getRemark());
}

PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);

return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}

consumer消费消息

拉取到消息后,在PullCallBack中会调用consumeMessageService.submitConsumeRequest来消费消息,实际上是封装成ConsumeRequest在线程池中批量消费消息,具体见下面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class ConsumeRequest implements Runnable {
private final List<MessageExt> msgs;
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;

public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
this.msgs = msgs;
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}

public List<MessageExt> getMsgs() {
return msgs;
}

public ProcessQueue getProcessQueue() {
return processQueue;
}

@Override
public void run() {
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}

MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;

ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}

long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}

//自定义的消费逻辑
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}

//执行consumer的hook
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}

if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}

ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

if (!processQueue.isDropped()) {
//处理消费结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}

public MessageQueue getMessageQueue() {
return messageQueue;
}

}

顺序消费

和Currently类似,不过首先对消息进行排序,首先将消息放入到processQueue的红黑树中,另外在消费时候先加锁,所粒度是MessageQueue,保证每个MessageQueue的顺序是一致的。如代码:

由于consumer只能做到单messageQueue消息有序,我们在producer侧生产消息时候应该按照我们的保序字段selectQueue才行,保证相同id的都打到一个messageQueue中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;

//ConsumeRequest只是面向processQueue和messageQueue的
//这里设计的很不错,由于顺序消费针对MessageQueue所以单Queue的消费是线性的,为了提高吞吐,采用了ProcessQueue镜像队列,用红黑树的思想对消息offset排序然后压进队列,ConsumeRequest不停的消费ProcessQueue里弹出来的消息
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}

public ProcessQueue getProcessQueue() {
return processQueue;
}

public MessageQueue getMessageQueue() {
return messageQueue;
}

@Override
public void run() {
//......

//关键代码锁机制,锁粒度是messageQueue,一个MessageQueue对应一个锁,也就是单一的messageQueue消费有序,需要发送时候选择messageQueue
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
//todo 大哥你用个while不行么!!!一直从processQueue中弹出来BatchSize个消息,进行消费知道空为止
//因为,不同的pullMessage会不停的给processQueue中压值知道满足阈值为止。
for (boolean continueConsume = true; continueConsume; ) {
//....省略校验逻辑

final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

//从processQueue的红黑树按照顺序take出consumeBatchSize个消息
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

ConsumeOrderlyStatus status = null;

ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}

long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
//保证线程安全,保证每个消息消费的线程安全
this.processQueue.getLockConsume().lock();
//.....
//消费逻辑
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
//......
hasException = true;
} finally {
//解锁
this.processQueue.getLockConsume().unlock();
}

// 处理消费结果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
// ......
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}

}

由于顺序消费为每个MessageQueue要加锁,为了创建大量的无用线程,在processQueue#putMessage方法会判断consuming的状态,如果当前processQueue已经在消费中了,消息只是放入红黑树但是不会启动线程去消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 将Messages放到红黑树中
* @param msgs message信息
* @return 如果msgTreeMap不为空,且当前不在消费返回true,为true启动线程开始消费,如果消费中就不在启动线程了防止高并发情况下大量的异步线程将线程池打满
*/
public boolean putMessage(final List<MessageExt> msgs) {
boolean dispatchToConsume = false;
try {
//锁因为msgTreeMap不是线程安全的
this.lockTreeMap.writeLock().lockInterruptibly();
try {
int validMsgCnt = 0;
for (MessageExt msg : msgs) {
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
if (null == old) {
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
msgSize.addAndGet(msg.getBody().length);
}
}
msgCount.addAndGet(validMsgCnt);

//message不为空,并且当前没有consuming.
// 因为顺序消费是启动一个线程一直processQueue中弹Message到processQueue为空
// 所以当前红黑树不为空,并且不在消费中返回true,起线程开始消费,如果消费中就不在启动线程了防止高并发情况下大量的异步线程将线程池打满
if (!msgTreeMap.isEmpty() && !this.consuming) {
dispatchToConsume = true;
this.consuming = true;
}

if (!msgs.isEmpty()) {
MessageExt messageExt = msgs.get(msgs.size() - 1);
String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
if (property != null) {
long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
if (accTotal > 0) {
this.msgAccCnt = accTotal;
}
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}

return dispatchToConsume;
}

处理消费请求

ConsumeMessageConcurrentlyService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();

if (consumeRequest.getMsgs().isEmpty())
return;

switch (status) {
case CONSUME_SUCCESS:
//ackIndex取consumeRequest.getMsgs()最后的一个元素小表
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;//size
int failed = consumeRequest.getMsgs().size() - ok;//0
//统计
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
//遍历message列表,发送RequestCode.CONSUMER_SEND_MSG_BACK请求给broker,如果有异常,加入到msgBackFailed
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

//消费请求中去掉发送CONSUMER_SEND_MSG_BACK的消息,然后对这些发送msgBack失败的消息在消费一遍。
//这里就是为什么我们程序要做幂等行的判断
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);

this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}

//更新内存中的offset
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

orderly和上述类似不过有一个提交过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) {
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) {
switch (status) {
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
consumeRequest.getMessageQueue());
case SUCCESS:
//提交
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
}
} else {
switch (status) {
case SUCCESS:
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case COMMIT:
commitOffset = consumeRequest.getProcessQueue().commit();
break;
case ROLLBACK:
consumeRequest.getProcessQueue().rollback();
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
}
break;
default:
break;
}
}

if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}

return continueConsume;
}