RoketMq源码学习-可用性研究

消息发送中用到了MQFaultStrategy来保证系统的稳定性,代码在包org.apache.rocketmq.client.latency中,其主要是通过LatencyFaultToleranceImpl来保证Broker的可用性的

MQFaultStrategy

主要成员

1
2
3
4
5
6
7
8
9
10
11
//延迟容错对象,维护延迟Brokers的信息
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

//延迟容错开关
private boolean sendLatencyFaultEnable = false;

//延迟级别数组
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};

//不可用时长数组 对应latencyMax如果花费时间大于latencyMax[i],则不可用时间为notAvailableDuration[i]
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

selectOneMessageQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 逻辑如下:
* 如果时延开关打开
* 1.选择一个队列,该mq的broker没在时延对列中返回
* 2.如果在时延对列中,找一个相对较好的不考虑可用性返回
* 3.不考虑可用性,选择一个返回
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//启用延迟策略
if (this.sendLatencyFaultEnable) {
try {
//取模选一个messageQueue,符合条件返回
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
......
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
//没在延迟队列中,lastBrokerName为空或者mq和lastBrokerName俩个相等直接返回,考虑可用性
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}

//选择一个相对较好的直接返回
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
//writeQueueNums为0了删除notBestBroker
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}

//不考虑可用性
return tpInfo.selectOneMessageQueue();
}

//不考虑可用性
return tpInfo.selectOneMessageQueue(lastBrokerName);
}

updateFaultItem

发送完Message通过计算花费的时间来计算broker的不可用时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 统计不可用时长,根据brokerName和花费时间,计算出不可用时间,从而调整时延策略
*
* @param brokerName brokername
* @param currentLatency 当前的延时(花费时间)
* @param isolation 和时延等级最大的比
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}

//对应latencyMax如果花费时间大于latencyMax[i],则不可用时间为notAvailableDuration[i]
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}

return 0;
}

LatencyFaultToleranceImpl

重要方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* 更新对象的时延级别
*
* @param name brokername
* @param currentLatency 当前操作花费时间
* @param notAvailableDuration 不可用时长
*/
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}

@Override
public boolean isAvailable(final String name) {
//为空或者不可用时间已经过期
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}

@Override
public String pickOneAtLeast() {
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
List<FaultItem> tmpList = new LinkedList<FaultItem>();
while (elements.hasMoreElements()) {
final FaultItem faultItem = elements.nextElement();
tmpList.add(faultItem);
}

if (!tmpList.isEmpty()) {
//
Collections.shuffle(tmpList);

Collections.sort(tmpList);

final int half = tmpList.size() / 2;
if (half <= 0) {
return tmpList.get(0).getName();
} else {
final int i = this.whichItemWorst.getAndIncrement() % half;
return tmpList.get(i).getName();
}
}

return null;
}

消费者的负载均衡

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.consumer.rebalance;

import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;

import java.util.ArrayList;
import java.util.List;

/**
* Average Hashing queue algorithm
*/
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();

@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}

List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}

//当前队列的下标
int index = cidAll.indexOf(currentCID);
//mod代表多出来的队列数
int mod = mqAll.size() % cidAll.size();
//averageSize 每个消费者对应的队列数
// mqAll.size() <= cidAll.size() 消费者数量超过了mq队列,每个消费者消费1个队列
// mod>0 && index < mod,说明:不能整除而且该消费者对应的队列数量是mqSize/cosumerSize+1即多出来一个,否则的是花是mqSize/cosumerSize
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());

//startIndex,该消费者对应的队列下标
//mod > 0 && index < mod,不能整除,而且消费者对应的队列数量是多余出来的,则开始的下标是index*avarageSize
//反之,消费者对应的队列数量不能多余出一个,index*avarageSize+mod ,这里注意avarageSize是比上面-1的(超过mod的下标,index*averageSize)
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;

//队列的数量是averageSize,和注意如果消费者大于mq且index>mode,这里会出现负数,也就是说多的消费者没有数据
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}

@Override
public String getName() {
return "AVG";
}
}