摘要:近期对消息队列比较感兴趣因此特意看了一下相关的知识不过在学时对的消息模型总是理解的不透彻于是在官网上找了一篇介绍消息模型的文章详细地看了一下还是要感叹一下啊官网的文章果然是最权威的看了以后有了不小的收获下面是我学习消息模型时的记录其内容大部
近期对消息队列比较感兴趣, 因此特意看了一下 RabbitMQ 相关的知识, 不过在学 RabbitMQ 时, 对 AMQP 的消息模型总是理解的不透彻, 于是在官网上找了一篇介绍 AMQP 消息模型的文章, 详细地看了一下.
还是要感叹一下啊, 官网的文章果然是最权威的, 看了以后有了不小的收获.
下面是我学习 AMQP 消息模型时的记录, 其内容大部分是翻译自官网, 部分添加了自己的理解.
https://www.rabbitmq.com/tuto...
AMQP 消息模型简介AMQP 的消息模型如下图所示:
通过此图我们可以知道, 一个消息的发送流程有如下几个步骤:
消息生产者将消息发布(Public)到 Exchange 中.
Exchange 根据队列的绑定关系将消息分发到不同的 Queue 中.
AMQP broker 根据订阅规则将消息发送给消费者 或 消费者自行根据需要从消息队列中获取消息.
Exchange 和 Exchange 类型Exchange 的主要任务是接收消息并将消息路由到0个或多个 Queue 中, 而路由的算法受 Exchange 类型和绑定(binding) 关系的影响. AMQP 0-9-1 broker 提供如下四个 exchange 类型:
类型 | 默认预定义的名字 |
---|---|
Direct Exchange | 空字符串和 amq.direct |
Fanout Exchange | amq.fanout |
Topic Exchange | amq.topic |
Headers Exchange | amq.match(在 RabbitMQ 中, 额外提供amq.headers) |
每个 Exchange 都有如下几个属性:
Name, Exchange 的 名字
Durability, 是否是持久的 Exchange, 当为真时, broker 重启后也会保留此 Exchange
Auto-delete, 当为真时, 如果所有绑定的的 Queue 都不再使用时, 此 Exchange 会自动删除
关于默认 Exchange默认的 exchange 是一个由 broker 预创建的匿名的(即名字是空字符串) direct exchagne. 对于简单的程序来说, 默认的 exchange 有一个实用的属性: 如果没有显示地绑定 Exchnge, 那么创建的每个 queue 都会自动绑定到这个默认的 exchagne 中, 并且此时这个 queue 的 route key 就是这个queue 的名字.
例如当我们声明了一个名为 "search-indexing-online" 的 queue, 那么 AMQP broker 会以 "search-indexing-online" 作为 route key 将此 queue 绑定到默认的 exchange 中. 因此当一个消息以 route key 为 "search-indexing-online" 投递到默认的 exchange 中时, 此消息就会被路由到这个 queue 中去. 换句话说, 由于有默认的 exchagne 的存在, 我们就好像可以直接将消息投递到指定的 queue 中去而不需要经过 exchange 一样.
例如:
Send:
public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent "" + message + """); channel.close(); connection.close(); } }
Recv:
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
在这个例子中, 我们并没有定义 exchange, 也没有显示地将 queue 绑定到 exchange 中, 因此 queue "hello" 就自动绑定到默认的 exchange 中了, 并且在默认的 exchange 中, 其 route key 和 queue 名一致, 即 "hello".
由于这个原因, 我们就可以使用:
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
来发送消息. 调用 channel.basicPublish 时, 第一个参数是 exchange 名, 为空就是默认的 exchange, 第二个参数是 route key, 和 queue 名相同.
direct exchangedirect exchange 可以使用如下图表示:
direct exchange 根据消息的 route key 来将消息分发到不同的 queue 中. direct exchange 适合用于消息的单播发送. direct exchange 的工作流程如下:
一个 queue 使用 K 作为 route key 绑定到 direct exchange 中.
当direct exchange 收到一个 route key 为 R 的消息时, 如果 R == K, 则此 exchange 会将此消息路由到此 queue 中.
direct exchange 经常用于在多个 worker 中分配任务(即一个 Master 和多个相同的 Slave). 当使用这个模型时, 需要注意的是:
When doing so, it is important to understand that, in AMQP 0-9-1, messages are load balanced between consumers and not between queues.
即 AMQP 0-9-1 的负载均衡是以consumer为单位的, 而不是以 queue 为单位.
fanout exchange一个 fanout exchange 会将消息分发给所有绑定到此 exchange 的 queue 中, 而不会考虑 queue 的 route key. 即如果有 N 个 Queue 绑定到一个 fanout exchange 时, 那么当此 exchange 收到消息时, 会将此消息分发到这 N 个 queue 中. 由于此性质, fanout exchange 也常用消息的广播(broadcast).
fanout 可以使用下图表示:
topic exchange 会根据 route key 将消息分发到与此消息的 route key 相匹配的并且绑定到此 exchagne 中的 queue 中(如果有多个 queue 使用了相同的 route key 绑定到此 exchange, 那么这些 queue 都会收到消息). 根据此性质, topic exchange 经常用于实现 publish/subscribe 模型, 即消息的多播模型.
header exchangeheader exchange 不使用 route key 作为路由的依据, 而是使用消息头属性来路由消息.
QueueAMQP 中的 队列 的概念和其他消息队列中 队列 的概念类似, 它有如下几个重要的概念:
Name, 名字
Durable, 是否是持久的. 当为真时, 即使 broker 重启时, 此 queue 也不会被删除.
Exclusive, 是否是独占的, 当为真时, 表示此 queue 只能有一个消费者, 并且当此消费者的连接断开时, 此 queue 会被删除.
Auto-delete, 当为真时, 此 队列 会在最后一个消费者取消订阅时被删除.
在使用一个 队列 时, 需要先进行声明. 如果我们声明的队列不存在, 那么 broker 就会自动创建它. 不过如果此队列已经存在时, 我们就需要注意了, 若我们声明的队列的属性和已存在的队列的属性一致, 则不会有任何的问题, 但是如果先后两次声明的队列的属性不一致, 则会有 PRECONDITION_FAILED 错误(错误码为406).
关于队列名AMQP 的队列名不能以 "amq." 开头, 因为这样的队列名是 AMQP broker 内部所使用的. 当我们使用了这样的队列名时, 那么会有一个 ACCESS_REFUSED 错误(错误码为 403)
关于持久队列持久队列会被持久化到磁盘中, 因此即使 broker 重启了, 持久队列也依然存在.
不过需要注意的是, 不要将持久队列和消息的持久化混淆. 当 broker 重启时, 持久队列会自动重新声明, 然而只有队列中的持久化消息(persistent message)才会被恢复.
队列的绑定关系是 exchagne 用于消息路由的规则, 即一个 exchange 能够将消息路由到某个队列的前提是此队列已经绑定到这个 exchange 中了. 当队列绑定到一个 exchange 中时, 我们还可以设置一个额外的参数, 即 route key, 这个 key 会被 direct exchange 和 topic exchange 作为额外的路由信息而使用, 换句话说, route key 扮演着过滤器的角色.
当一个消息没有被路由到任意的队列时(例如此 exchange 没有任何的 queue 绑定着), 那么此时会根据消息的属性来决定是将此消息丢弃还是返回给生产者.
AMQP 0-9-1 支持两种消息分发模式:
push 模式, 即 broker 主动推送消息给消费者
pull 模式, 即消费者主动从 broker 中拉取消息.
在 push 模式时, 应用程序需要告知 broker 它对哪些消息感兴趣, 即也就是我们所说的订阅一个消息主题. 每个消费者都有一个惟一的标识符, 即consumer tag, 我们可以用这个 tag 来取消一个消费者对某个主题的订阅(unsubscribe).
消息的 ACKAMQP 0-9-1 有两种消息 ACK 模式:
自动 ACK 模式
手动 ACK 模式
在自动 ACK 模式下, 当 broker 发送消息成功后, 会立即将此消息从消息队列中删除, 而不会等待消费者的 ACK 回复. 而在手动 ACK 模式下, 当 broker 发送消息给消费者时, 不会立即将此消息删除, 而是需要等待消费者的 ACK 回复后才会删除消息. 因此在手动 ACK 模式下, 当消费者收到消息并处理完成后, 需要向 broker 显示地发送 ACK 指令.
在手动 ACK 模式下, 如果消费者因为意外的 crash 而没有发送 ACK 给 broker, 那么此时 broker 会将此消息转发给其他的消费者(如果此时没有消费者了, 那么 broker 会缓存此消息, 直到有新的消费者注册).
当一个消费者处理消息失败或此时不能处理消息时, 那么可以给 broker 发送一个拒绝消息的指令, 并且可以要求 broker 丢弃或重新分发此消息.
不过需要注意的是, 如果此时只有一个消费者, 那么当此消费者拒收消息并要求 broker 重新分发此消息时, 那么就会造成了此消息不断地分发和拒收, 形成了死循环.
通过预取消息机制, 消费者可以一次性批量取出消息, 然后在处理后对这些批量消息进行统一的 ACK 回复, 这样可以提高消息的吞吐量.
不过, 需要注意的时, RabbitMQ 仅支持 channel 级别的预取消息的数量配置, 不支持基于连接的预取消息数量配置.
AMQP 的连接是长连接, 它是一个使用 TCP 作为可靠传输的应用层协议.
通道(Channel)AMQP 不推荐一个应用程序发起多个对 broker 的连接, 因为这样会消耗系统资源并且也不利于防火墙的配置. 但是如果应用程序确实需要有多个不互相干扰的连接来进行不同的操作时该怎么办呢? 为了解决这个问题, AMQP 引入了 Channel 的 概念. 在 AMQP 0-9-1 中, 一个与 broker 的连接是被多个 Channel 复用的, 因此我们可以将 channel 理解为: 一个共享同一个 TCP 连接的轻量级的连接.
基于同一个 TCP 连接的两个不同的 channel 直接是不会有任何的干扰的(在逻辑上可以等效地理解为两个独立的连接), 因此客户端和 broker 之间交互时, 需要附带上 channel id.
通常来说, 在一个多线程消费消息的模型中, 每个线程多带带打开一个 channel 是一个推荐的做法, 而最好不要在各个线程中共享一个 channel.
为了在一个 broker 中实现不同的相互隔离的环境(例如每个环境中有不同的用户, 不同的 exchange, 不同的队列等), AMQP 引入了一个叫做 virtual host(vhost) 的概念. 在连接 broker 时, 客户端可以指定需要使用哪个 vhost.
本文由 yongshun 发表于个人博客, 采用 署名-相同方式共享 3.0 中国大陆许可协议.
Email: yongshun1228@gmail.com
本文标题为: RabbitMQ AMQP 消息模型攻略
本文链接为: https://segmentfault.com/a/1190000007123977
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/66128.html
摘要:后续介绍交换机,生产者直接将消息投递到中。消息,服务器和应用程序之间传送的数据,由和组成。也称为消息队列,保存消息并将它们转发给消费者。主要是应为和有一个绑定的关系。 showImg(https://img-blog.csdnimg.cn/20190509221741422.gif); showImg(https://img-blog.csdnimg.cn/20190731191914...
摘要:通过以上分析我们可以得出消息队列具有很好的削峰作用的功能即通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。 该文已加入开源项目:JavaGuide(一份涵盖大部分Java程序员所需要掌握的核心知识的文档类项目,Star 数接近 16k)。地址:https://github.com/Snailclimb... 本文内容思维导图:showImg(ht...
摘要:需要特别明确的概念交换机的持久化,并不等于消息的持久化。消息的处理,是有两种方式,一次性。在上述示例中,使用的,意味着接收全部的消息。注意与是两个不同的队列。后端处理,可以针对每一个启动一个或多个,以提高消息处理的实时性。 RabbitMQ与PHP(一) 项目中使用RabbitMQ作为队列处理用户消息通知,消息由前端PHP代码产生,处理消息使用Python,这就导致代码一致性问题,调...
摘要:慕课网消息中间件极速入门与实战学习总结时间年月日星期三说明本文部分内容均来自慕课网。 慕课网《RabbitMQ消息中间件极速入门与实战》学习总结 时间:2018年09月05日星期三 说明:本文部分内容均来自慕课网。@慕课网:https://www.imooc.com 教学源码:无 学习源码:https://github.com/zccodere/s... 第一章:RabbitM...
摘要:消息持久化控制的属性就是消息的持久化。当生产者发送的消息路由键为时,两个消费者都会收到消息并处理当生产者发送的消息路由键为时,只有消费者可以接收到消息。八的消息确认机制在中,可以通过持久化数据解决服务器异常的数据丢失问题。 一、内容大纲&使用场景 1. 消息队列解决了什么问题? 异步处理 应用解耦 流量削锋 日志处理 ...... 2. rabbitMQ安装与配置 3. Java操...
阅读 1589·2021-11-22 14:45
阅读 1007·2021-11-17 09:33
阅读 3292·2021-09-02 09:48
阅读 946·2019-08-30 15:54
阅读 2726·2019-08-30 15:53
阅读 2514·2019-08-30 12:54
阅读 2216·2019-08-29 12:37
阅读 2389·2019-08-26 13:58