资讯专栏INFORMATION COLUMN

如何解决MQ消息消费顺序问题

Atom / 1270人阅读

摘要:利用的高级特性特性是一种负载均衡的机制。在一个消息被分发到之前,首先检查消息属性。属性为某个值的消息单个消息或消息集合在描述,和的对应关系,以及负载均衡策略时。同样做到了保证消息的顺序情况下,均衡消费的消费消息。

通常mq可以保证先到队列的消息按照顺序分发给消费者消费来保证顺序,但是一个队列有多个消费者消费的时候,那将失去这个保证,因为这些消息被多个线程并发的消费。但是有的时候消息按照顺序处理是很重要的,那我们该如何来保证消息的顺序呢,下面将从activemq和rocketmq来看看,它们是如何来保证消息的顺序问题的?我们还可以有别的处理方案么?

Activemq处理方案

1、利用Activemq的高级特性:consumer之独有消费者(exclusive consumer)

在ActiveMQ4.x中可以采用Exclusive Consumer,broker会从queue中,一次发送消息给一个消费者,这样就避免了多个消费者并发消费的问题,从而保证顺序,配置如下:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);

当在接收信息的时候有一个或者多个备份接收消息者和一个独占消息者的同时接收时候,无论两者创建先后,在接收的时候,均为独占消息者接收。

当在接收信息的时候,有多个独占消费者的时候,只有一个独占消费者可以接收到消息。

当有多个备份消息者和多个独占消费者的时候,当所有的独占消费者均close的时候,只有一个备份消费者接到到消息。

当主消费者挂了话,会立刻启用故障切换转移到下一台消费者继续消费

                                   图1

独占消息就是在有多个消费者同时消费一个queue时,可以保证只有一个消费者可以消费消息,这样虽然保证了消息的顺序问题,不过也带来了一个问题,就是这个queue的所有消息将只会在这一个主消费者上消费,其他消费者将闲置,达不到负载均衡分配,而实际业务我们可能更多的是这样的场景,比如一个订单会发出一组顺序消息,我们只要求这一组消息是顺序消费的,而订单与订单之间又是可以并行消费的,不需要顺序,因为顺序也没有任何意义,有没有办法做到呢?答案是可以的,下面就来看看activemq的另一个高级特性之messageGroup。

2、利用Activemq的高级特性:messageGroups

Message Groups特性是一种负载均衡的机制。在一个消息被分发到consumer之前,broker首先检查消息JMSXGroupID属性。如果存在,那么broker会检查是否有某个consumer拥有这个message group。如果没有,那么broker会选择一个consumer,并将它关联到这个message group。此后,这个consumer会接收这个message group的所有消息,直到:

Consumer被关闭

Message group被关闭,通过发送一个消息,并设置这个消息的JMSXGroupSeq为-1

                                   图2

配置如下:

bytesMessage.setStringProperty("JMSXGroupID", "constact-20100000002");
bytesMessage.setIntProperty("JMSXGroupSeq", -1);

如上图所示,同一个queue中,拥有相同JMSXGroupID的消息将发往同一个消费者,解决顺序问题,不同分组的消息又能被其他消费者并行消费,解决负载均衡的问题,两全其美啦!

问题讨论,除了上述Activemq为我们提供的方案,我们是否不依赖这两种特性,也能解决顺序问题呢?

Rocketmq处理方案

那rocketmq又是如何保证消息顺序消费的问题呢?

Rocketmq跟传统的MQ有一点区别,这里有必要讲一下topic的概念,Topic是RocketMQ中的一个重要概念,RocketMQ的各组件都是围绕着Topic建立起对应关系的,在RocketMQ官方文档和本文中, Topic在不同的语境下被赋予了两种不同的语义:

1)消息的Topic属性值:在描述Consumer的订阅设置信息或消息的属性时。

2)Topic属性为某个值的消息(单个消息或消息集合):在描述Broker,Producer和Consumer的对应关系,Queue以及负载均衡策略时。

topic和queue的对应关系是一个topic拥有多个queue,当producer往broken发送消息时,消息会存储在topic下的不同队列中,而一个队列只会被一个consumer消费,这样消息户被均衡负载到不同的队列下,也就是会被多个消费者并行消费,顺序就无法保证了。该怎么办呢?答案是把需要顺序消费的消息发送到同一台broker server下的同一个队列,而这些消息也只会被同一个消费者消费,这样就可以保证严格的顺序了,如下图:

1、消息要有顺序,首先得保证producer发送消息有顺序,如上图msg1,msg2,msg3发送到queue0是要有顺序的,只要producer等待前面的消息发送成功,在发送后面的消息就完全可以保证了,

2、假设msg1发送给consumer1,消费没有响应,该怎么办呢?是继续发送msg2还是重新发送msg1,一般为了保证消息一定被消费,肯定会选择重发msg1到下一台消费者consumer2。

3、消费端1没有响应Server时有两种情况,一种是msg1确实没有到达(数据在网络传送中丢失),另外一种消费端已经消费msg2且已经发送响应消息,只是MQ Server端没有收到。如果是第二种情况,重发msg1,就会造成msg1被重复消费。也就引入了消息重复问题,那就要幂等了。

Rocketmq同样做到了保证消息的顺序情况下,均衡消费的消费消息。

综上,我看到,在分布式系统中,要想消息有顺序的被消费,无论是Activemq还是Rocketmq都要想办法让有顺序的消息被同一消费者消费,而不是并发的消费,在消费者消费成功后,接着才会消费下一个消息,这样就可以保证了严格的顺序。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/69153.html

相关文章

  • 关于MQ的几件小事(六)消息积压在消息队列里怎么办

    摘要:紧接着征用倍的机器来部署,每一批消费一个临时的消息。这种做法相当于临时将资源和资源扩大倍,以正常速度的倍来消费消息。解决方案这种情况下,实际上没有什么消息挤压,而是丢了大量的消息。 1.大量消息在mq里积压了几个小时了还没解决 场景: 几千万条数据在MQ里积压了七八个小时,从下午4点多,积压到了晚上很晚,10点多,11点多。线上故障了,这个时候要不然就是修复consumer的问题,让他恢复消...

    SwordFly 评论0 收藏0
  • 关于MQ的几件小事(五)如何保证消息顺序执行

    摘要:一个对应一个,但是里面进行了多线程消费,这样也会造成消息消费顺序错误。保证消息的消费顺序拆分多个,每个一个,就是多一些而已,确实是麻烦点这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式取消费。 1.为什么要保证顺序 消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。举例: 比如通过mysql binlog进行两个数据...

    h9911 评论0 收藏0
  • 使用canal+Kafka进行数据库同步实践

    摘要:比如,服务数据库的数据来源于服务的数据库服务的数据有变更操作时,需要同步到服务中。第二种解决方案通过数据库的进行同步。并且,我们还用这套架构进行缓存失效的同步。目前这套同步架构正常运行中,后续有遇到问题再继续更新。在微服务拆分的架构中,各服务拥有自己的数据库,所以常常会遇到服务之间数据通信的问题。比如,B服务数据库的数据来源于A服务的数据库;A服务的数据有变更操作时,需要同步到B服务中。第一...

    Tecode 评论0 收藏0
  • RocketMq消息中间件介绍

    摘要:消息生产者,负责发消息到。消息消费者,负责从上拉取消息进行消费,消费完进行。集群部署端完全消费正常后在进行手动确认。消息发送成功后,服务器返回确认消息给生产者。根据本地事务执行的结果向发送提交或回滚消息。 RabbitMQerlang开发,对消息堆积的支持并不好,当大量消息积压的时候,会导致RabbitMQ的性能急剧下降。...

    goji 评论0 收藏0
  • Java面试之消息队列

    摘要:将消息持久化成功之后,向发送方确认消息已经发送成功,此时消息为半消息。发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。发送方根据检查得到的本地事务的最终状态再次提交二次确认,仍按照步骤对半消息进行操作。1.应用场景 解耦 异步 流量消峰 日志记录 2.重复消息的解决方案 消费端处理消息的业务逻辑保持幂等性 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现...

    番茄西红柿 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<