资讯专栏INFORMATION COLUMN

消息队列常见问题解析

苏丹 / 3270人阅读

摘要:消息队列带来的问题系统可用性降低系统引入的外部依赖越多,系统越容易出问题。系统复杂性提高加入消息队列后,需要保证消息没有被重复消费,保证消息传递的顺序性等等。

消息队列相关笔记 消息队列的应用场景:

消费者执行过程比较长且生产者不需要消费者返回结果。用于更新索引库,生成商品详情页,发短信。

为什么要使用消息队列:

通过异步处理提高系统性能(削峰、减少响应所需时间);

降低系统耦合性。

削锋作用:通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。

消息队列带来的问题:

系统可用性降低:系统引入的外部依赖越多,系统越容易出问题。如果MQ出问题,整套系统更容易崩溃。

系统复杂性提高:加入消息队列后,需要保证消息没有被重复消费,保证消息传递的顺序性等等。

一致性问题:消息没有被真正消费者正确消费,会导致数据不一致的情况。

JMS:Java Message Service,是一个消息服务的标准和规范,允许应用程序组件基于JaveEE平台创建、发送、接收和读取消息。

JMS的两种消息模型:

点到点模型(P2P):

使用队列作为消息通信载体,一条消息只能被一个消费者使用,未消费的消息在队列中保留直到被消费或者超时。

发布/订阅模型(Pub/Sub):

发布者发布一条信息,通过主题传递给所有的订阅者,在一条消息广播后才订阅的用户收不到该条消息。

JMS的五种消息正文格式:

StreamMessage——Java原始值的数据流

MapMessage——一套键值对

TextMessage——一个字符串对象

ObjectMessage——一个序列化的Java对象

BytesMessage——一个字节的数据流

AMQP:Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议(二进制应用层协议)。

如何保证消息队列的高可用?

以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。

RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式。

普通集群模式,无高可用性,在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。这个方案提高了吞吐量,但没有高可用性。

镜像集群模式,有高可用性,queue里的消息会存在于多个实例上,每个RabbitMQ节点都有这个queue的一个完整镜像,包含queue的全部数据。该模式的缺点:性能开销过大,带宽压力和消耗很重;不是分布式的,没有扩展性。

Kafka的高可用性:由多个broker组成,每个broker是一个节点;你创建的一个topic会被划分为多个partition,每个partition存储于不同的broker上,每个partition包含部分topic的数据,是一个天然的分布式消息队列。

Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。

每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。

所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。

这么搞,就有所谓的高可用性了,因为如果某个 broker 宕机了,没事儿,那个 broker上面的 partition 在其他机器上都有副本的,如果这上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。

ack消息延迟:

如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。→

持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了。→

关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次确保消费端处理完数据的时候,再在程序里 ack 一把。

各类MQ对比

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单击吞吐量 万 级 万级 10万级 10万级
topic数量对吞吐量的影响 topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 topic从几十个到几百个的时候,吞吐量会大幅度下降
时效性 ms级 微秒级,延迟最低 ms级 ms级
可用性 非常高,分布式架构 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 低概率丢失数据 经过参数优化配置,可以做到0丢失 经过参数优化配置,可以做到0丢失
功能支持 功能极其完备 基于erlang开发,所以并发能力很强,性能极其好,延时很低 MQ功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准

代码


    org.springframework
    spring-jms
//生产者
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination queueSolrDestination;
@Autowired
private Destination topicPageDestination;
//更新索引库
jmsTemplate.send(queueSolrDestination, new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(jsonString);
    }
});
//生成商品详情页
jmsTemplate.send(topicPageDestination, new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
        return session.createTextMessage(id + "");
    }
});
//消费者(监听)
@Component
public class PageListener implements MessageListener{
    @Autowired
    private ItemPageService itemPageService;
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            String text = textMessage.getText();
            System.out.println("接收到消息:" + text);
            boolean b = itemPageService.genItemHtml(Long.parseLong(text));
            System.out.println("网页生成结果:" + b);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/76162.html

相关文章

  • rocketmq之producer解析

    摘要:所以基于目前的设计,建议关闭自动创建的功能,然后根据消息量的大小,手动创建。如果发送消息,返回结果超时,这种超时不会进行重试了如果是方法本身耗时超过,还未来得及调用发送消息,此时的超时也不会重试。 先来看下producer核心的类设计,如下图: showImg(http://pbdqyl9hh.bkt.clouddn.com/rocketmq/producer%E7%B1%BB%E5%...

    luodongseu 评论0 收藏0
  • JavaScript同步和异步

    摘要:异步如果在函数返回的时候,调用者还不能购得到预期结果,而是将来通过一定的手段得到例如回调函数,这就是异步。的意思是,将回调函数立刻插入消息队列,等待执行,而不是立即执行。 大家好,我是wmingren,小伙伴们都知道JavaScript是单线程的语言,所谓的单线程呢就是指如果有多个任务就必须去排队,前面任务执行完成后,后面任务再执行。到这里我们就产生了一个疑问,既然是单线程的,又怎么会...

    Eirunye 评论0 收藏0
  • 分布式消息队列 RocketMQ 源码分析 —— Message 发送与接收

    摘要:第至行从中获得发布信息。第至行容错策略选择消息队列逻辑。第至行执行发起网络请求。第至行处理消息发送结果,设置响应结果和提示。第至行发送成功,响应。第至行消息格式与大小校验。 摘要: 原创出处 http://www.iocoder.cn/RocketM... 「芋道源码」欢迎转载,保留摘要,谢谢! 本文主要基于 RocketMQ 4.0.x 正式版 1、概述 2、Producer 发...

    seasonley 评论0 收藏0
  • 使用canal+Kafka进行数据库同步实践

    摘要:比如,服务数据库的数据来源于服务的数据库服务的数据有变更操作时,需要同步到服务中。第二种解决方案通过数据库的进行同步。并且,我们还用这套架构进行缓存失效的同步。目前这套同步架构正常运行中,后续有遇到问题再继续更新。在微服务拆分的架构中,各服务拥有自己的数据库,所以常常会遇到服务之间数据通信的问题。比如,B服务数据库的数据来源于A服务的数据库;A服务的数据有变更操作时,需要同步到B服务中。第一...

    Tecode 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<