关于消息的消费分为consumer和broker俩个模块,会分为俩篇文章阐述
消费者按照消费方式的不同分为:
- MQPushConsumer实现类是:DefaultMQPushConsumer,由broker推送消息给consumer
MQPullConsumer实现类是:DefaultMQPullConsumer,定期去borker拉取消息
消费消息的逻辑通过接口MessageListener,它有俩个实现接口
MessageListenerConcurrently:并发的消费
MessageListenerOrderly:保序消费,
消费分为俩种模式MessageModel,即:
CLUSTERING:集群,默认
- BROADCASTING:广播
DefaultMQPushConsumer
类图:
整个流程如下:new DefaultMQPushConsumer()–>subscribe–>start()
create以及重要属性
1 | /** |
主要属性
- defaultMQPushConsumerImpl:内部实现
- consumerGroup:消费组
- messageModel:消费方式默认是CLUSTERING、BROADCASTING:广播
- consumeFromWhere,消费者启动前要设置ConsumeFromWhere,他们的含义是
- ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET:新消费第一次启动时候从最开始的Offset开始消费
- ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET:新消费第一次启动时候从最近的Offset开始消费
- ConsumeFromWhere.CONSUME_FROM_TIMESTAMP::新消费第一次启动时候从最时间戳开始消费
- consumeTimestamp:默认半小时前和CONSUME_FROM_TIMESTAMP配合使用
- allocateMessageQueueStrategy:消费者端的队列选择器,消费者从特定的queue消费消息’
- subscription:tag的过滤器
- messageListener:负责消息的消费
- offsetStore:负责存储offset
- consumeThreadMin和consumeThreadMax:消费消息启动的最小、最大线程数
- adjustThreadPoolNumsThreshold:当消息堆积到该阈值时候会动态的调整消费线程数
- consumeConcurrentlyMaxSpan:当队列允许的最大offset的跨度,到达后触发限流。只对并发消费(ConsumeMessageConcurrentlyService)有效
- pullThresholdForQueue:每条consume queue的消息拉取下来后会缓存到本地,消费结束会删除。当累积达到一个阈值后,会触发该consumerqueue限流
- pullThresholdForTopic:同上只是这个是针对topic的
- pullThresholdSizeForQueue和pullThresholdSizeForTopic:限制消息缓存的大小单位是MiB,默认无限制
- pullInterval:拉取消息队列的间隔,默认是0
- consumeMessageBatchMaxSize:批量消费消息的条数,messageListener的方法中是个List[<]MessageExt[>] msgs,这个值就是控制msgs的的数量
- pullBatchSize:去broker拉取消息一次多少条,和上面的属性关系一次拉取pullBatchSize条,分成pullBatchSize/consumeMessageBatchMaxSize个任务去消费 ??
- postSubscriptionWhenPull:每次拉取的时候是否更新订阅关系
- unitMode:
- maxReconsumeTimes:重试次数,达到会进入到私信队列,默认是是-1
- 并行:16次
- 串行:无限次。一直等到消费成功后才会笑一次
- suspendCurrentQueueTimeMillis:串行消费重试的建个
- consumeTimeout:消费过期时间
subscribe-订阅
通过调用DefaultMQPushConsumerImpl#subscribe方法实现订阅
1 | public void subscribe(String topic, String subExpression) throws MQClientException { |
start
DefaultMQPushConsumer#start()会调用DefaultMQPushConsumerImpl#start(),即:
1 | public synchronized void start() throws MQClientException { |
broker在收到HEAT_BEAT的请求后,会调用heatbeat方法,该方法同时处理prodcuer和consumer的注册,其中consumer的注册是调用this.brokerController#getConsumerManager#registerConsumer的方法,代码如下:
1 | public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) { |
遗留问题:this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
SubscriptionData
rebalanceImpl
消息的消费
虽然叫做DefaultMQPushConusmerImpl,但是消息的消费实际上是一个pull的过程,那么消息是如何保证实时性的呢?
消息的消费分为拉取消息和消费消息里俩步,拉取消息是PullMessageService类来负责,消费消息ConsumeMessageService来负责
在PullMessageService中是一个用阻塞队列实现的典型的生产者消费者模式实现的。见代码:
1 | public void executePullRequestImmediately(final PullRequest pullRequest) { |
其中executePullRequestImmediately就是触发PullRequest的。
- 当我们的消费者start()之后会调用mQClientFactory.rebalanceImmediately(),在跟进该方法的过程中发现updateProcessQueueTableInRebalance方法,最终会调用dispatchPullRequest该方法会调用executePullRequestImmediately实际上就触发了消息的拉取
- run会调用DefaultMQPushConsumerImpl的pullMessage方法在改方法中通过pullAPIWrapper.pullKernelImpl去broker拉取消息。
- 在拉取之后并且在pullCallback中调用ConsumeMessageService消费消息。并且重新调用executePullRequestImmediately重复上述2。
综上,rocketmq就是用这种不停的take–>pull–>add来保证消息的实时性的
1 | public void pullMessage(final PullRequest pullRequest) { |
拉取消息在consumer测的过程
- pullAPIWrapper.pullKernelImpl会调用MQClientAPIImpl#pullMessage方法给broker发送RequestCode.PULL_MESSAGE请求
- 发送方式默认是ASYNC,所以在pullMessageAsync方法中会处理将response转成PullResultExt对象,这时候MsgList是空。
- 通过调用 processPullResponse()方法来转换
- 之后在InvokeCallback回调中调用pullCallback完成消费
pullMessageAsync具体代码
1 | private void pullMessageAsync( |
processPullResponse具体代码
1 | private PullResult processPullResponse( |
consumer消费消息
拉取到消息后,在PullCallBack中会调用consumeMessageService.submitConsumeRequest来消费消息,实际上是封装成ConsumeRequest在线程池中批量消费消息,具体见下面:
1 | class ConsumeRequest implements Runnable { |
顺序消费
和Currently类似,不过首先对消息进行排序,首先将消息放入到processQueue的红黑树中,另外在消费时候先加锁,所粒度是MessageQueue,保证每个MessageQueue的顺序是一致的。如代码:
由于consumer只能做到单messageQueue消息有序,我们在producer侧生产消息时候应该按照我们的保序字段selectQueue才行,保证相同id的都打到一个messageQueue中
1 | class ConsumeRequest implements Runnable { |
由于顺序消费为每个MessageQueue要加锁,为了创建大量的无用线程,在processQueue#putMessage方法会判断consuming的状态,如果当前processQueue已经在消费中了,消息只是放入红黑树但是不会启动线程去消费。
1 | /** |
处理消费请求
ConsumeMessageConcurrentlyService
1 | public void processConsumeResult( |
orderly和上述类似不过有一个提交过程
1 | public boolean processConsumeResult( |