RocketMQ分布式事务

事务性消息

关于一些概念以及原理部分请参考文章 参考链接
我就直接上代码了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* 事务型消息发送端示例
*/
public class TransactProducer {
public static void main(String[] args) throws MQClientException {
TransactionMQProducer transactionMQProducer = new TransactionMQProducer("tx_group_name");
transactionMQProducer.setNamesrvAddr("192.168.32.138:9876");
ExecutorService executorService = Executors.newFixedThreadPool(10);
transactionMQProducer.setExecutorService(executorService);
transactionMQProducer.setTransactionListener(new TransactionListenerLocal());
transactionMQProducer.start();
for (int i = 0; i < 8; i++) {
try{
String id= UUID.randomUUID().toString();
Message message = new Message("tx_topic", "tx_tag", id, ("Hello Transaction RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
TransactionSendResult transactionSendResult = transactionMQProducer.sendMessageInTransaction(message, id + "&" + i);
System.out.printf("发送状态:%s %n",transactionSendResult.getLocalTransactionState());
}catch (Exception e){
e.printStackTrace();
}
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

public class TransactionListenerLocal implements TransactionListener {
private static final Map<String,Boolean> results=new ConcurrentHashMap<>();
//执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("执行本地事务:" + arg.toString());
String id = arg.toString();
boolean rs = saveId(id);
// 这个返回状态表示告诉 broker 这个事务消息是否被确认,允许给到 consumer 进行消费
// LocalTransactionState.ROLLBACK_MESSAGE 回滚
//LocalTransactionState.UNKNOW 未知(这个状态的会执行回查方法,提供给broker回调)
return rs ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.UNKNOW;
}

//提供事务执行状态的回查方法,提供给broker回调
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String id = msg.getKeys();
System.out.println("执行事务状态的回查,id:"+id);
boolean rs = Boolean.TRUE.equals(results.get(id));
System.out.println("回调:"+rs);
return rs ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}
private boolean saveId(String id) {
boolean success = Math.abs(Objects.hashCode(id)) % 2 == 0;
results.put(id, success);
return success;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

/**
* 事务型消息消费端示例
*/
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tx_consumer_name");
consumer.setNamesrvAddr("192.168.32.138:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("tx_topic","*");
/**
* 注册消息监听回调
* 有两种回调 MessageListenerConcurrently 为普通监听,MessageListenerOrderly 为顺序监听
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt me : msgs) {
String keys = me.getKeys();
System.out.printf("消费消息:%s,业务Id:%s %n",new String(me.getBody()),keys);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}

在事务型消息中会有三种状态,分别如下

  1. COMMIT_MESSAGE 提交事务
  2. ROLLBACK_MESSAGE 回滚事务
  3. UNKNOW broker 会定时的回查 producer 消息状态,直到彻底成功或失败