摘要:消息队列带来的问题系统可用性降低系统引入的外部依赖越多,系统越容易出问题。系统复杂性提高加入消息队列后,需要保证消息没有被重复消费,保证消息传递的顺序性等等。
消息队列相关笔记 消息队列的应用场景:
消费者执行过程比较长且生产者不需要消费者返回结果。用于更新索引库,生成商品详情页,发短信。
为什么要使用消息队列:通过异步处理提高系统性能(削峰、减少响应所需时间);
降低系统耦合性。
削锋作用:通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。
消息队列带来的问题:系统可用性降低:系统引入的外部依赖越多,系统越容易出问题。如果MQ出问题,整套系统更容易崩溃。
系统复杂性提高:加入消息队列后,需要保证消息没有被重复消费,保证消息传递的顺序性等等。
一致性问题:消息没有被真正消费者正确消费,会导致数据不一致的情况。
JMS:Java Message Service,是一个消息服务的标准和规范,允许应用程序组件基于JaveEE平台创建、发送、接收和读取消息。
JMS的两种消息模型:点到点模型(P2P):
使用队列作为消息通信载体,一条消息只能被一个消费者使用,未消费的消息在队列中保留直到被消费或者超时。
发布/订阅模型(Pub/Sub):
发布者发布一条信息,通过主题传递给所有的订阅者,在一条消息广播后才订阅的用户收不到该条消息。
JMS的五种消息正文格式:StreamMessage——Java原始值的数据流
MapMessage——一套键值对
TextMessage——一个字符串对象
ObjectMessage——一个序列化的Java对象
BytesMessage——一个字节的数据流
AMQP:Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议(二进制应用层协议)。
如何保证消息队列的高可用?以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。
RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。
单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式。
普通集群模式,无高可用性,在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。这个方案提高了吞吐量,但没有高可用性。
镜像集群模式,有高可用性,queue里的消息会存在于多个实例上,每个RabbitMQ节点都有这个queue的一个完整镜像,包含queue的全部数据。该模式的缺点:性能开销过大,带宽压力和消耗很重;不是分布式的,没有扩展性。
Kafka的高可用性:由多个broker组成,每个broker是一个节点;你创建的一个topic会被划分为多个partition,每个partition存储于不同的broker上,每个partition包含部分topic的数据,是一个天然的分布式消息队列。
Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。
每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。
所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。
这么搞,就有所谓的高可用性了,因为如果某个 broker 宕机了,没事儿,那个 broker上面的 partition 在其他机器上都有副本的,如果这上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。
ack消息延迟:
如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。→
持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了。→
关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次确保消费端处理完数据的时候,再在程序里 ack 一把。
各类MQ对比
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单击吞吐量 | 万 级 | 万级 | 10万级 | 10万级 |
topic数量对吞吐量的影响 | topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 | topic从几十个到几百个的时候,吞吐量会大幅度下降 | ||
时效性 | ms级 | 微秒级,延迟最低 | ms级 | ms级 |
可用性 | 高 | 高 | 非常高,分布式架构 | 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 低概率丢失数据 | 经过参数优化配置,可以做到0丢失 | 经过参数优化配置,可以做到0丢失 | |
功能支持 | 功能极其完备 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低 | MQ功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准 |
代码
org.springframework spring-jms
//生产者 @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination queueSolrDestination; @Autowired private Destination topicPageDestination; //更新索引库 jmsTemplate.send(queueSolrDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(jsonString); } }); //生成商品详情页 jmsTemplate.send(topicPageDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(id + ""); } });
//消费者(监听) @Component public class PageListener implements MessageListener{ @Autowired private ItemPageService itemPageService; @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); System.out.println("接收到消息:" + text); boolean b = itemPageService.genItemHtml(Long.parseLong(text)); System.out.println("网页生成结果:" + b); } catch (JMSException e) { e.printStackTrace(); } } }
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/76162.html
摘要:所以基于目前的设计,建议关闭自动创建的功能,然后根据消息量的大小,手动创建。如果发送消息,返回结果超时,这种超时不会进行重试了如果是方法本身耗时超过,还未来得及调用发送消息,此时的超时也不会重试。 先来看下producer核心的类设计,如下图: showImg(http://pbdqyl9hh.bkt.clouddn.com/rocketmq/producer%E7%B1%BB%E5%...
摘要:异步如果在函数返回的时候,调用者还不能购得到预期结果,而是将来通过一定的手段得到例如回调函数,这就是异步。的意思是,将回调函数立刻插入消息队列,等待执行,而不是立即执行。 大家好,我是wmingren,小伙伴们都知道JavaScript是单线程的语言,所谓的单线程呢就是指如果有多个任务就必须去排队,前面任务执行完成后,后面任务再执行。到这里我们就产生了一个疑问,既然是单线程的,又怎么会...
摘要:第至行从中获得发布信息。第至行容错策略选择消息队列逻辑。第至行执行发起网络请求。第至行处理消息发送结果,设置响应结果和提示。第至行发送成功,响应。第至行消息格式与大小校验。 摘要: 原创出处 http://www.iocoder.cn/RocketM... 「芋道源码」欢迎转载,保留摘要,谢谢! 本文主要基于 RocketMQ 4.0.x 正式版 1、概述 2、Producer 发...
摘要:比如,服务数据库的数据来源于服务的数据库服务的数据有变更操作时,需要同步到服务中。第二种解决方案通过数据库的进行同步。并且,我们还用这套架构进行缓存失效的同步。目前这套同步架构正常运行中,后续有遇到问题再继续更新。在微服务拆分的架构中,各服务拥有自己的数据库,所以常常会遇到服务之间数据通信的问题。比如,B服务数据库的数据来源于A服务的数据库;A服务的数据有变更操作时,需要同步到B服务中。第一...
阅读 3548·2021-11-15 11:36
阅读 1041·2021-11-11 16:55
阅读 663·2021-10-20 13:47
阅读 2975·2021-09-29 09:35
阅读 3310·2021-09-08 10:45
阅读 2536·2019-08-30 15:44
阅读 799·2019-08-30 11:10
阅读 1412·2019-08-29 13:43