RocketMQ中消息的顺序发送与消费

顺序消息

消息的有序指的是可以按照消息的发送顺序来消息,例如一个订单分为三种状态,分别为:预定中、服务中、已完成。要按照这种顺序消费才有意义。
假定消息 M1 发送到 S1,M2 发送到 S2,如果要保证 M1 优先于 M2 被消费,那么需要 M1 到达消费端被消费后,通知S2,然后 S2 再将 M2 发送到消费端。如下图所示

M1 和 M2 分别发送到两台 S1、S2 服务器,我们并不能保证 M1 先到达MQ集群,也不能保证 M1 优先被消费。
退一步来讲,M2 优先于 M1 到达集群。甚至 M2 被消费后,M1 才到达消费端。所谓顺序消息就乱了。所以这种方案就被否掉了。

我们在来看另外一种,M1 和 M2 发送到同一台服务器,生产者等待 M1 发送成功后在发送 M2,这样就保证了消息的顺序性。如下图所示

理论上我们是可以这么做的,但在实际场景中只要涉及到通信肯定会有网络延迟的情况。如果发送 M1 耗时大于发送M2的耗时,那 M2 仍将被先消费,仍然不能保证消息的顺序。
即使 M1 和 M2 同时到达消费端,由于不清楚消费端1和消费端2的负载情况,仍然有可能出现M2先于M1被消费的情况。如下图所示

那如何解决这个问题呢?我们只要将 M1 和 M2 发送到同一个消费者,并且发送 M1 后需要消费端响应之后才能发送 M2。
同样的还有另外一个问题,如果 M1 被发送到消费端之后,消费端1没有响应,那我们是重新发送M1还是继续发送M2呢?
一般我们为了保证消息一定被消费,我们会重发 M1 到另外一个消费端,如下图所示

这样的模型就严格保证消息的顺序,但是还会有问题,消费端1没有响应Server时有两种情况,分别如下

  1. M1 确实没有到达消费端(因某些因素消息而丢失)
  2. 消费端已经消费M1且已经发送响应消息,只是 MQServer 端没有收到。

如果是第二种情况,重发 M1,就会造成 M1 被重复消费的问题。(RocketMQ 不保证消息不重复,如果自己的业务需要保证严格的不重复消息,需要你自己在业务端去重)
如果在 RocketMQ 中,我们如何实现这种消息的有序性呢?
在 RocketMQ 中需要保证顺序的消息要发送到同一个 messageQueue 中,一个 messageQueue 只能被一个消费者消费。所以我们要做到的是生产者 - messageQueue - 消费者之间是一对一对一的关系。
我们可以通过自定义的发送策略来实现消息只发送到同一个队列。因为一个 Topic 会有多个 messageQueue,如果使用 Producer 的默认配置,这个 Producer 会轮流向各个 messageQueue 发送消息。
Consumer 在消费的时候会根据负载均衡策略来消费 messageQueue 中的消息。在 RocketMQ 中,我们只需要实现如下代码就能保证消费发送到同一个 messageQueue。
如下代码相同的 orderId 会发送到同一个 messageQueue 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

/**
* 顺序发送消息示例
*/
public class SequenceMQProducer {
public static void main(String[] args) throws MQClientException {
//创建一个名为 my_sequence_name 的组
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("my_sequence_name");
//指定namesrv服务地址,多个地址以 ; 隔开
defaultMQProducer.setNamesrvAddr("192.168.32.138:9876");
defaultMQProducer.start();
for (int i = 0; i < 100; i++){
try {
int orderId = i % 10;
//创建一个名为 mySequenceTopic 的主题,名为 mySequenceTag 的tag,并将消息转换未 byte[] 数据
Message message = new Message("mySequenceTopic",
"mySequenceTag", ("Hello sequence RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult send = defaultMQProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object o) {
Integer id = (Integer) o;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("发送消息返回结果:%s%n", send);
Thread.sleep(1000);

} catch (Exception e) {
e.printStackTrace();
}
}
}
}

消息顺序消费如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
* 消息消费端示例
*/
public class SequenceMQConsumer {
public static void main(String[] args) throws MQClientException {
//创建一个名为 sequence_consumer_group_name 的组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sequence_consumer_group_name");
//指定namesrv服务地址,多个地址以 ; 隔开
consumer.setNamesrvAddr("192.168.32.138:9876");
//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
//如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅消息生产者中的 mySequenceTopic,* 代表消费该 topic 下所有的 tag,*表示不过滤,可以通过tag来过滤,比如:”myTag”
consumer.subscribe("mySequenceTopic","*");
/**
* MessageListenerOrderly 为顺序监听
*/
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt me : msgs) {
System.out.printf("消费消息:%s %n",new String(me.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}

顺序消费会带来一些性能上的影响,如下几点

  1. 遇到消费失败的消息,无法跳过,当前队列消费暂停
  2. 降低了消息的处理性能

消费端的负载均衡

消费端也会针对 messageQueue 做负载均衡的策略,使每个消费端能够合理的消费每个分区中消息。
消费端会通过 RebalanceService 线程,10秒钟做一次基于 topic 下的所有队列负载

  • 消费端遍历自己的所有 topic,依次调 rebalanceByTopic
  • 根据 topic 获取此 topic 下的所有queue
  • 选择一台 broker 获取基于 group 的所有消费端(有心跳向所有broker注册客户端信息)
  • 选择队列分配策略实例 AllocateMessageQueueStrategy 执行分配算法

在以下几种情况下会触发负载均衡的策略

  • 消费者启动之后
  • 消费者数量发生变更
  • 每10秒会触发检查一次 rebalance

RocketMQ提供了6中分区的分配算法,分别如下

  • (AllocateMessageQueueAveragely)平均分配算法(默认)
  • (AllocateMessageQueueAveragelyByCircle)环状分配消息队列
  • (AllocateMessageQueueByConfig)按照配置来分配队列: 根据用户指定的配置来进行负载
  • (AllocateMessageQueueByMachineRoom)按照指定机房来配置队列
  • (AllocateMachineRoomNearby)按照就近机房来配置队列:
  • (AllocateMessageQueueConsistentHash)一致性hash,根据消费者的cid进行

消息消费端的确认机制

RocketMQ 提供了 ack 机制,以保证消息能够被正常消费。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ 才会认为消息消费成功。
不管是普通消费还是顺序消费,都会有这段代码return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 以及 return ConsumeOrderlyStatus.SUCCESS;
来告诉 RocketMQ 是消费完成的,如果这时候消息消费失败,例如抛出异常或者在我们的业务中认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER或者ConsumeOrderlyStatus.ROLLBACK,RocketMQ 就会认为消息消费失败了。

消息的重试机制

只有当消费模式为集群模式时,Broker 才会自动进行重试,对于广播消息是不会重试的。(RocketMQ 默认为集群模式)
为了保证消息肯定至少被消费一次,RocketMQ 会把消息重新发回到 Broker,在延迟的某个时间点后,再次投递到这个 ConsumerGroup。
如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。然后我们只需要关注死信队列,并对死信队列中的消息做人工的业务补偿操作。
rocketMQ 中默认的重试时间窗口

1
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

但在实际生产中我们不需要重试16次,这样既浪费时间又浪费性能,我们可以将重复次数达到我们想要的结果时如果还是消费失败,那么我们需要将对应的消息进行记录,并且结束重复尝试。如下代码所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
* 重试次数消费端示例
*/
public class RetryMQConsumer {
public static void main(String[] args) throws MQClientException {
//创建一个名为 my_consumer_group_name 的组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group_name");
//指定namesrv服务地址,多个地址以 ; 隔开
consumer.setNamesrvAddr("192.168.32.138:9876");
//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
//如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅消息生产者中的 myTopic,* 代表消费该 topic 下所有的 tag,*表示不过滤,可以通过tag来过滤,比如:”myTag”
consumer.subscribe("myTopic","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt me: msgs) {
System.out.printf("消费消息:%s %n",new String(me.getBody()));
if(me.getReconsumeTimes() == 3){
//可以将对应的消息保存到数据库,以便人工干预
System.out.println("消费消息失败保存到数据库:"+new String(me.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
}
}