RocketMq分布式事务采用的是XA协议。其中的TransactionManager由Broker担任。
原理
- producer发送half-Message(prepare)给broker。
- 发送成功后,producer侧开始执行本地事务,sendResult会包含transactionId和本地事务进行绑定,之后broker反查消息需通过这个ID来处理事物。(用户实现)
- 本地事务执行成功后,发送commit或者rollback状态给broker。
- Broker确认是否收到了Producer发送的commit或者rollback的消息
- Broker收到消息
- 如果收到的是commit,认为事务提交成功,交由consumer处理
- 如果收到的是rollback,认为事务回滚,consumer看不到本条消息。broker删除half-Message。(rocketmq会先将消息发送到一个事务专用的topic中QueueId为0,然后如果commit成功会将消息移到real_topic中,消费者订阅的是real_topic这时候就能看到消息了,如果rollback是不会移动的消费者就看不到这条消息)
- Broker没收到消息
- borker定时回查本地事务,如果本地事务已经执行返回commit;如果本地事务未执行rollback;(用户实现)
- commit:返回commit,说明事务已经提交,consumer进行执行
- rollback:回滚:
- unknown:一直会差到成功为止
- borker定时回查本地事务,如果本地事务已经执行返回commit;如果本地事务未执行rollback;(用户实现)
- Broker收到消息
- 事务的消费,和传统消息消费没什么区别,不过要注意,rocketmq的消费者侧的本地事务如果失败了,需要自行解决数据一致性,毕竟rocketmq整体事务回滚代价太大了
官方Demo
发送的方式
1 | public class TransactionProducer { |
源码解析
Producer
TransactionMQProducer关键代码
1 | public class TransactionMQProducer extends DefaultMQProducer { |
defaultMQProducerImpl的相关代码
1 | public void initTransactionEnv() { |
broker
主要有3个部分
接收半消息消息
处理half-message的入口在SendMessageProcessor.processRequest()方法中,前面介绍消息发送时候介绍过这里不在过多赘述,最终我们跟到SendMessageProcessor.sendMessage方法中,这里在处理发送的事件时候会判断消息的PROPERTY_TRANSACTION_PREPARED标记是否为true如果为true。会调用brokerController.getTransactionalMessageService().prepareMessage,具体如下代码
1 | private RemotingCommand sendMessage(final ChannelHandlerContext ctx, |
继续跟下去:brokerController.getTransactionalMessageService中TransactionalMessageService的实现类TransactionalMessageServiceImpl的prepareMessage方法
1 |
|
继续:
1 | public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) { |
回查half-message
brokerController在启动时候,会启动一个检查线程定期林旭RMQ_SYS_TRANS_HALF_TOPIC这个Topic中是否有过期的half-message,然后对这个half-message的producer发送RequestCode.CHECK_TRANSACTION_STATE的请求
过程和代码如下:
- brokerController在initialTransaction初始化TransactionalMessageCheckService
- brokerController在start时候调用TransactionalMessageCheckService的start方法启动线程
- TransactionalMessageCheckService每隔1分钟轮询一次RMQ_SYS_TRANS_HALF_TOPIC这个topic,发现过期half-message后通知producer。
TransactionalMessageCheckService关键代码
1 |
|
继续跟TransactionalMessageServiceImpl.check
1 |
|
继续跟进listener.resolveHalfMsg中去
1 | public void resolveHalfMsg(final MessageExt msgExt) { |
prouder处理RequestCode.CHECK_TRANSACTION_STATE
- produer的ClientRemotingProcessor会调用 this.checkTransactionState(ctx, request);
- this.checkTransactionState(ctx, request);会调用producer.checkTransactionState(addr, messageExt, requestHeader);
- producer.checkTransactionState(addr, messageExt, requestHeader);处理事务的逻辑代码如下
1 |
|
处理EndTransaction
在本地事务完成或者检查后都需要给broker发送RequestCode.END_TRANSACTION信息,broker在接到请求流程是
EndTransactionProcessor的processRequest处理具体请求
1 |
|