无声的消失最为致命,这篇文章就来总结一下前段时间在项目中遇到的一个问题-无声消失的消息,
先来看一下业务场景,现在系统A作为消费方,要分别接收来自生产方C的C1消息与C2消息分别来进行处理。
这里给出的解决方案是:系统A分别订阅生产方C的C1的topic1 与 C2的topic2.接下来给出一个简单的配置与实现模拟。
于是给出了以下配置:
| 生产者 | 消费方1 | 消费方2 | |
|---|---|---|---|
| nameSrv | 192.168.0.102:9876 | 192.168.0.102:9876 | 192.168.0.102:9876 |
| groupName | Group_produce | Group_c | Group_c |
| topic1 | topic1 | topic1 | |
| topic2 | topic2 | topic2 |
写到这里有经验的朋友不知道有没有看出问题所在。下面我将给出该场景的模拟。
环境配置
这里在本机上使用docker搭建了一个RocketMq的环境模拟,其中搭建教程 Mac m1搭建Rocket Mq,已经试过完全按照教程来,没问题。其他电脑配置,大家就自行在网上找一些教程吧。
生产者代码模拟
/**
* 生产者生产消息
* @Author gorge
* @Version 1.0
* @Date 2023/5/6 20:43
**/
public class MqProduceTest {
public static DefaultMQProducer producer = new DefaultMQProducer("Group_produce");
@Resource
public static void main(String[] args) throws MQClientException {
Scanner sc = new Scanner(System.in);
producer.setRetryTimesWhenSendFailed(3);
producer.setNamesrvAddr("192.168.0.102:9876");
producer.start();
String topic="";
String message="";
System.out.println("开始启动:");
while(true){
System.out.println("请输入topic:");
topic = sc.next();
System.out.println("请输入消息体:");
message = sc.next();
sendMessage(topic,message);
}
}
public static void sendMessage(String topic,String message){
System.out.println("topic:"+topic);
Message sendMsg = new Message(topic,"*",message.getBytes(StandardCharsets.UTF_8));
SendResult sendResult = null;
try {
sendResult = producer.send(sendMsg);
System.out.println(topic+"end");
} catch (MQClientException | MQBrokerException | RemotingException | InterruptedException e) {
System.out.println(e);
Thread.currentThread().interrupt();
}
}
}
消费者A分别订阅两个topic
消费者A订阅topic1
/**
* @Author gorge
* @Version 1.0
* @Date 2023/5/6 20:43
**/
public class MqConsumer1 {
public static void main(String[] args) throws MQClientException {
// 创建接收消息的对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group_c");
consumer.setNamesrvAddr("192.168.0.102:9876");
consumer.subscribe("topic1","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@SneakyThrows
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt messageExt : list){
if(messageExt.getReconsumeTimes()>1){
continue;
}
System.out.println("0--topic1--x"+(new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET)));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
消费者A来订阅topic2
/**
* @Author gorge
* @Version 1.0
* @Date 2023/5/6 20:44
**/
public class MqConsumer2 {
public static void main(String[] args) throws MQClientException {
// 创建接收消息的对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group_c");
consumer.setNamesrvAddr("192.168.0.102:9876");
consumer.subscribe("topic2","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@SneakyThrows
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt messageExt : list){
if(messageExt.getReconsumeTimes()>1){
continue;
}
System.out.println("0--topic2--x"+(new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET)));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
场景模拟
生产者生产消息
开始启动:
请输入topic:
topic1
请输入消息体:
topic1mes_2
请输入topic:
topic2
请输入消息体:
topic2mes_2
请输入topic:
topic1
请输入消息体:
topic1mes_3
请输入topic:
topic1
请输入消息体:
topic1mes_4
请输入topic:
topic2
请输入消息体:
topic2mes_3
请输入topic:
topic2
请输入消息体:
topic2mes_4
消费者消费消息
消费者MqConsumer1 订阅的topic1获取到的消息
0--topic1--xtopic1mes_1
0--topic1--xtopic1mes_2
0--topic1--xtopic1mes_4
0--topic1--xtopic2mes_3
0--topic1--xtopic2mes_1
消费者MqConsumer2 订阅的topic2获取到的消息
0--topic2--xtopic2mes_1
0--topic2--xtopic1mes_3
0--topic2--xtopic2mes_4
0--topic2--xtopic2mes_2
看到这里相信大家应该都发现了一个奇怪的现象—>消费组1订阅的topic1,但是却消费到了topic2的内容。消费组2订阅的topic2,但是却消费带了topic1的内容,导致了消费内容混乱。
RocketMq的消费模型
有了上面的现象,凭脑袋在那里空想肯定是没有用的。所以对RockerMq的消费模型进行了回顾,看了一些资料。
RocketMq是标准的发布-订阅模型。

在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。
我的理解可能也到此为止了。所以想当然的认为消费组名只是为了标识该消费组,真正起作用的是该消费组订阅的主题。
下面找了一张RocketMq的消费图解。
RocketMQ中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响。但是消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息
但是我们上面出现的现象是用topic对消息进行了区分,还依然出现消费混乱的现象。这里就要引出一个概念订阅关系一致性
订阅关系的一致性是指同一个消费组下所有的Consumer实例所订阅的Topic与Tag以及对消息的处理逻辑必须完全一致。否则,消息消费的逻辑就会混乱,甚至导致消息消失。
解决方案及总结
经过上述分析,是因为前期的配置违背了订阅关系一致性的原则,将配置中的消费组名加以区分,问题解决。
因此,在使用RocketMq的时候,从消费者的角度来讲,我们要设计多个消费者组订阅了多个Topic,并且每个消费者组里面的多个消费者实例的订阅关系需要保持一致。
问题:
RocketMq中的tag可以过滤需要处理的消息,那么我们是不是可以违背订阅关系一致性,在消息层面使用tag进行区分呢?
评论区
欢迎你留下宝贵的意见,昵称输入QQ号会显示QQ头像哦~