RoketMq源码学习-七-事务的实现

RocketMq分布式事务采用的是XA协议。其中的TransactionManager由Broker担任。

原理

  1. producer发送half-Message(prepare)给broker。
  2. 发送成功后,producer侧开始执行本地事务,sendResult会包含transactionId和本地事务进行绑定,之后broker反查消息需通过这个ID来处理事物。(用户实现)
  3. 本地事务执行成功后,发送commit或者rollback状态给broker。
  4. Broker确认是否收到了Producer发送的commit或者rollback的消息
    1. Broker收到消息
      1. 如果收到的是commit,认为事务提交成功,交由consumer处理
      2. 如果收到的是rollback,认为事务回滚,consumer看不到本条消息。broker删除half-Message。(rocketmq会先将消息发送到一个事务专用的topic中QueueId为0,然后如果commit成功会将消息移到real_topic中,消费者订阅的是real_topic这时候就能看到消息了,如果rollback是不会移动的消费者就看不到这条消息)
    2. Broker没收到消息
      1. borker定时回查本地事务,如果本地事务已经执行返回commit;如果本地事务未执行rollback;(用户实现)
        1. commit:返回commit,说明事务已经提交,consumer进行执行
        2. rollback:回滚:
        3. unknown:一直会差到成功为止
  5. 事务的消费,和传统消息消费没什么区别,不过要注意,rocketmq的消费者侧的本地事务如果失败了,需要自行解决数据一致性,毕竟rocketmq整体事务回滚代价太大了

官方Demo

发送的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//事务的监听器,本地事务在这里进行
TransactionListener transactionListener = new TransactionListenerImpl();

//生产者采用TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");

//处理broker会查本地事务的线程池
//producer接到broker发送的RequestCode.CHECK_TRANSACTION_STATE,异步来会差处理本地事务的状态,并且返回给borker
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();

String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//send half message给broker
//如果sendResult是sendOk会开始执行transactionListener.executeLocalTransaction逻辑,即本地事务。
//然后根据localTransactionState发送请求给RequestCode.END_TRANSACTION,broker来选择事务是否完成
SendResult sendResult = producer.sendMessageInTransaction(msg, null);

Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}

for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}

源码解析

Producer

TransactionMQProducer关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class TransactionMQProducer extends DefaultMQProducer {

//用于borker回查事务消息用
private ExecutorService executorService;
//监听器
private TransactionListener transactionListener;

@Override
public void start() throws MQClientException {
//初始化事务相关的环境具体见小面代码
this.defaultMQProducerImpl.initTransactionEnv();
super.start();
}

@Override
public void shutdown() {
super.shutdown();
//停止事务
this.defaultMQProducerImpl.destroyTransactionEnv();
}


@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
//发送事务类型的消息
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
}

defaultMQProducerImpl的相关代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
public void initTransactionEnv() {
//TransactionMQProducer中的defaultMQProducerImpl持有的对象只可能是TransactionMQProducer
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService();
} else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
}
}

public void destroyTransactionEnv() {
if (this.checkExecutor != null) {
this.checkExecutor.shutdown();
}
}

/**
* 1.发送half-message消息sendOk执行本地任务
* 2.执行完本地事务通过endTransaction消息通知borker事务结果
* 3.封装TransactionSendResult返回
* @param msg half message
* @param localTransactionExecuter 已经不在需要了传空就好了,5.0.0会用producer的transactionListener替代
* @param arg 消息参数
* @return TransactionSendResult 返回执行结果
* @throws MQClientException 异常
*/
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);

SendResult sendResult = null;
//为Message设置相关Tranaction的属性,根据PROPERTY_TRANSACTION_PREPARED属性broker知道是half-message
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
//发送half-message
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
//发送结果返回了transactionId,将结果写入到Property。暂时没用到感觉,而且sendResult.getTransactionId为空
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
//发送消息调用setUniqID时候生成的唯一ID作为事务ID:业务方通过这个事务ID和本地事务绑定处理反差逻辑
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}

//不用关心5.0.0会删掉
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//处理本地事物返回localTransactionState
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}

if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}

//给borker发送本地事物结束的request
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}

public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
//通过half-message获取MessageId,优先取OffsetMsgId:消息追加到磁盘时候生成的ID16byte,包含【来源地址+msg在文件中的偏移量】)
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}

//为空
String transactionId = sendResult.getTransactionId();
//查找borker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());

//构建EndTransactionRequest请求
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
//设置Message的标记是回滚还是commit
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}

requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}

broker

主要有3个部分

接收半消息消息

处理half-message的入口在SendMessageProcessor.processRequest()方法中,前面介绍消息发送时候介绍过这里不在过多赘述,最终我们跟到SendMessageProcessor.sendMessage方法中,这里在处理发送的事件时候会判断消息的PROPERTY_TRANSACTION_PREPARED标记是否为true如果为true。会调用brokerController.getTransactionalMessageService().prepareMessage,具体如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
......
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
//这里就是重点
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
......
}
....
}

继续跟下去:brokerController.getTransactionalMessageService中TransactionalMessageService的实现类TransactionalMessageServiceImpl的prepareMessage方法

1
2
3
4
@Override
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.putHalfMessage(messageInner);
}

继续:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}

//处理preparedMessage
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
//RMQ_SYS_TRANS_HALF_TOPIC,queueId始终为0
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}

回查half-message

brokerController在启动时候,会启动一个检查线程定期林旭RMQ_SYS_TRANS_HALF_TOPIC这个Topic中是否有过期的half-message,然后对这个half-message的producer发送RequestCode.CHECK_TRANSACTION_STATE的请求

过程和代码如下:

  1. brokerController在initialTransaction初始化TransactionalMessageCheckService
  2. brokerController在start时候调用TransactionalMessageCheckService的start方法启动线程
  3. TransactionalMessageCheckService每隔1分钟轮询一次RMQ_SYS_TRANS_HALF_TOPIC这个topic,发现过期half-message后通知producer。

TransactionalMessageCheckService关键代码

1
2
3
4
5
6
7
8
9
10
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
//检查过期事务
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

继续跟TransactionalMessageServiceImpl.check

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Override
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
//Half-message topic
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.info("Check topic={}, queues={}", topic, msgQueues);
//有Half-message,检查如果有消息过期默认是6sec,省略大部分校验过程
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
MessageQueue opQueue = getOpQueue(messageQueue);
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
......

List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
//
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
......
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
while (true) {
......
if (removeMap.containsKey(i)) {
......
} else {
//读取
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
.....
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);
if (isNeedCheck) {
//给msgExt赋值half-message的msgId等属性
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
//处理half-message
listener.resolveHalfMsg(msgExt);
} else {
......
}
}
newOffset = i + 1;
i++;
}

......
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("Check error", e);
}

}

继续跟进listener.resolveHalfMsg中去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}

public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
//注意下这个MsgId就是MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,分布式任务开始时,发送half-message设置
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());

checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}

prouder处理RequestCode.CHECK_TRANSACTION_STATE

  1. produer的ClientRemotingProcessor会调用 this.checkTransactionState(ctx, request);
  2. this.checkTransactionState(ctx, request);会调用producer.checkTransactionState(addr, messageExt, requestHeader);
  3. producer.checkTransactionState(addr, messageExt, requestHeader);处理事务的逻辑代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@Override
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
//用线程池去处理
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();

@Override
public void run() {
......
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
......
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
//...业务自己处理检查事务状态
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}

//内部方法
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}

private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);

//取消息的msgID,优先取事务中的MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,如果为空会取,发送来的MsgId
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}

thisHeader.setMsgId(uniqueKey);
//这个值和最开始发送办消息的message是一样的见上面
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
switch (localTransactionState) {
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
}

String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}

try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};

this.checkExecutor.submit(request);
}

处理EndTransaction

在本地事务完成或者检查后都需要给broker发送RequestCode.END_TRANSACTION信息,broker在接到请求流程是

EndTransactionProcessor的processRequest处理具体请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.info("Transaction request:{}", requestHeader);
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}

//请求来自于TransactionCheck,打印log
if (requestHeader.getFromTransactionCheck()) {
......
} else {
......
}

OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
//提交事务
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
//校验halfmessage的Offset和Produceguroup的信息等
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//重点,将消息移到真实的Topic和QueueId,这样consumer才能看到
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
//重置标记等信息
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
RemotingCommand sendResult = sendFinalMessage(msgInner);
//删除halfmessage
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
//回滚事务
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}