摘要:消息确认为,会等待的显式确认。在消息发送到之后会立刻路由到中,因此未持久化的在重启后会丢失元数据以及绑定,对和消息的持久化无影响。指定如果一个或者有多个的情况下,只有最大的那个才会生效。要求集群中至少要有一个磁盘节点,储存了所有的元数据。
Connection & Channel
Connection 代表一个 TCP 连接,Channel 是建立在 Connection 上的虚拟连接。RabbitMQ 每条指令都是通过 Channel 完成的。
对于 OS 而言,创建和销毁 TCP 连接的代价非常高,在高峰期很容易遇到瓶颈。程序中一般会有多个线程需要与 RabbitMQ建立通信,消费或生产消息,通过 TCP 连接复用来减少性能开销。
Connection 可以创建多个 Channel ,但是 Channel 不是线程安全的所以不能在线程间共享。
Connection 在创建时可以传入一个 ExecutorService ,这个线程池时给该 Connection 上的 Consumer 用的。
Channel.isOpen 以及 Connection.isOpen 方法是同步的,因此如果在发送消息时频繁调用会产生竞争。我们可以认为在 createChannel 方法后 Channel 以及处于开启状态。若在使用过程中 Channel 关闭了,那么只要捕获抛出的 ShutDownSignalException 就可以了,同时建议捕获 IOException 以及 SocketException 防止连接意外关闭。
Exchange & Queue消费者和生产者都可以声明一个已经存在的 Exchange 或者 Queue ,前提是参数完全匹配现有的 Exchange 或者 Queue,否则会抛出异常。
QueueDeclare 参数:
exclusive: 排他队列,只有同一个 Connection 的 Channel 可以访问,且在 Connection 关闭或者客户端退出后自动删除,即使 durable 为 true 。
queuePurge(String queue):清空队列
Exchange 可以绑定另一个 Exchange:exchangeBind(String destination, String source, String routeKey), 从 source 到 destination
若业务允许,则最好预先创建好 Exchange 以及 Queue 并进行绑定(rabbitmqadmin),防止 Exchange 没有绑定 Queue 或 绑定错误的 Queue 而导致消息丢失(关键信息应当使用 mandatory 参数)。
Alternate Exchange: 在 Channel.exchangeDeclare 时添加 alternate-exchange 参数或在 Policy 中声明。mandatory 为 true 时,未被路由的消息会被发送到 Alternate Exchange 。建议 Exchange Type 设置为 fanout ,否则当 RoutingKey 依然不匹配就会被返回 Producer。
P.S. 有些书上讲备份交换器和 mandatory 参数一起使用 mandatory 参数失效是错的,当 RoutingKey 不匹配 Alternate Exchange 依然会被返回 Producer 。
(rabbitmq v3.7 测试)
MapPublish & Consume Publish Confirmarg = new HashMap () {{ put("alternate-exchange", "alt"); }}; channel.exchangeDeclare("normalExchange", "direct", true, false, arg); channel.exchangeDeclare("alt", "fanout", true, false, null); channel.queueDeclare("normalQueue", true, false, false, null); channel.queueDeclare("notSend", true, false, false, null); channel.queueBind("normalQueue", "normalExchange", "key"); channel.queueBind("notSend", "alt", "");
消息发送到服务器后可能还没来的及刷到磁盘中,服务器就挂掉,从而造成消息丢失。 Publish Confirm 能够在消息确实到达服务器(开启持久化的消息会在刷入磁盘之后)之后返回一个确认给 Publisher。
通过 channel.confirmSelected 把 Channel 设置为 Confirm 模式,并为 Channel 添加一个 ConfirmLister 来监听返回的确认。
SortedSetunconfirmedSet = new TreeSet<>(); channel.confirmSelect(); channel.addConfirmListener((deliveryTag, multiple) -> { System.out.println("handleAck: " + deliveryTag + " " + multiple); if (multiple) { unconfirmedSet.headSet(deliveryTag - 1).clear(); } else { unconfirmedSet.remove(deliveryTag); } }, (deliveryTag, multiple) -> { System.out.println("handleNack: " + deliveryTag + " " + multiple); if (multiple) { unconfirmedSet.headSet(deliveryTag - 1).clear(); } else { unconfirmedSet.remove(deliveryTag); } }); while (true) { long seq = channel.getNextPublishSeqNo(); channel.basicPublish("normalExchange", "key", true, null, message.getBytes(StandardCharsets.UTF_8)); unconfirmedSet.add(seq); Thread.sleep(1000); }
除了异步处理的方式之外还有批量确认以及事务的方法。批量确认的速度在大量连续发送的情况下和异步的方法差不多。不管怎样这两种消息确认的方法都要比事务的方式快7倍左右。
Consumer一般应当实现 Consumer 接口或者继承 DefaultConsumer ,Consumer 通过 consumerTag 来进行区分。
消费消息有两种方式,一种是 Push ,一种是 Get。
Push 是由 RabbitMQ 以轮询的方式将消息推送到 Consumer ,方法为 basicConsume 。一般一个 Channel 对应一个 Consumer 。
Get 由客户端主动从 RabbitMQ 拉取一条消息,方法为 basicGet 。__不能循环执行 basicGet 来代替 basicConsume ,不然会严重影响性能。__
消息确认:autoAck 为 false ,RabbitMQ 会等待 basicAck 的显式确认。除非 Consumer 连接断开否则一直等待确认。当 Consumer 显式调用 basicReject 或者 basicNack 并将 requeue 设为 true 后会将消息重新入队投递。一般我们在业务处理完之后再 ack .
mandatory : 当 Exchange 无法匹配 Queue 或 Exchange 时,mandatory 为 true 的消息会被返回给 Producer,否则会被丢弃。 通过 Channel.addReturnListener 来添加 ReturnListener 监视器。
queueDeclare 时添加 x-message-ttl 参数,单位毫秒。
Maparg = new HashMap () {{ put("x-message-ttl", "1000000"); }}; channel.exchangeDeclare("normalExchange", "direct", true, false, arg);
使用 AMQP.BasicProperties.Builder 创建 AMQP.BasicProperties 并设置 expiration 参数。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("100000"); channel.basicPublish("normalExchange", "key", true, builder.build(), message.getBytes(StandardCharsets.UTF_8));Dead Letter Exchange (DLX)
Dead Letter(死信):
Basic.Reject / Basic.Nack 并且 requeue 为 true
消息 TTL 过期
队列达到最大长度
当消息成为 Dead Letter 之后, RabbitMQ 会自动把这个消息发到 DLX 上。
// 当发送到 normalQueue 中的消息成为 Dead Letter 之后会自动以 // dead-letter 为 routingKey 发送到 dlxQueue Exchange Maparg = new HashMap () {{ put("x-dead-letter-exchange", "dlx"); put("x-dead-letter-routing-key", "dead-letter"); }}; channel.queueDeclare("normalQueue", true, false, false, arg); channel.exchangeDeclare("dlx", "direct", true, false, false, null); channel.queueDeclare("dlxQueue", true, false, false, null);
DLX 其他用法:延迟队列,消息 发送到一个暂存的、没有 Consumer 的 Queue 并设置 TTL,Consumer 消费 DLX 绑定的 Queue 的消息,建议给暂存的 Queue 设置一个最大的 TTL,防止消息没有设置 TTL 而一直堆积在 Queue 中。
Priority消息的消费可以有优先级,Queue 的最大优先级可以通过 x-max-priority 进行设置。
MapDurabilityarg = new HashMap () {{ put("x-max-priority", 5); }}; channel.queueDeclare("normalQueue", true, false, false, arg); channel.exchangeDeclare("normalExchange", "direct", true, false, null); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(2); channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));
Exchange , Queue , 消息都可以进行持久化。在消息发送到 Exchange 之后会立刻路由到 Queue 中,因此未持久化的 Exchange 在重启后会丢失 Exchange 元数据以及绑定,对 Queue 和消息的持久化无影响。
未持久化的 Queue 在重启后会丢失,包括 Queue 中的消息,不管消息是否设置了持久化。
未持久化的消息在重启后会丢失,即使所在的 Queue 已持久化。
channel.queueDeclare("normalQueue", true, false, false, null); // Queue 持久化 channel.exchangeDeclare("normalExchange", "direct", true, false, null); // Exchange 持久化 channel.queueBind("normalQueue", "normalExchange", "key"); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(2); // 消息持久化 channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));Qos
Qos 的作用时负载均衡。当一个队列有两个 Consumer ,一个性能很好 A,另一个不那么好 B,RabbitMQ 会轮询,将消息平均地分给这两个 Consumer。可见 B 上的堆积的消息会越来越多,而 A 上的线程可能会空闲。 Qos 的作用就是防止一个 Consumer 堆积了过多的消息,把这些消息分给其他 Consumer。
global 参数:
channel.basicQos(3, false); // each Consumer limit 3 channel.basicQos(5, true); // this channel limit to 5
global 参数会让 RabbitMQ 调用更多资源,尽量不要设置(默认值为 false)。
RelibilityRabbitMQ 支持最少一次和最多一次。
最少一次:
- 启用 Publisher Confirm 或者 事务保证消息能够到达服务器。 - 启用 mandatory 参数保证消息不回被 Exchange 丢掉。 - 消息和 Queue 开启持久化。 - Consumer autoAck off, 并确保消息在处理完之后再 ackPolicy
Policy 可以很方便的批量设置 Exchange 以及 Queue 的属性,但是 Policy 的优先级较低,请注意。
Policy 可以通过 HTTP API, web console,以及 cli 的方式。
rabbitmqctl set_policy [-p vhost] [--priority prirority] [--apply-to apply-to] {name} {pattern} {defination}
vhost : 指定 vhost
proiority : 如果一个 Queue 或者 Exchange 有多个 Policy 的情况下,只有 priority 最大的那个 Policy 才会生效。
apply-to : 应用到
Exchange and Queue
Exchange
Queue
name : Policy 的名字
pattern : Exchange 或者 Queue 名字的正则表达式
defination : 属性值,可以通过 management > Admin > Policies 的查看。
ClusterRabbitMQ 会把所有的元数据存储到所有的节点上,但是队列是分散在集群中所有的节点上的。
Build A Cluster with docker我们尝试使用 Docker Compose 创建一个由 3 个服务组成的集群
version: "3" services: node1: image: rabbitmq:3.7-management-alpine container_name: node1 hostname: node1 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5673:5672" - "15673:15672" node2: image: rabbitmq:3.7-management-alpine container_name: node2 hostname: node2 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5674:5672" - "15674:15672" node3: image: rabbitmq:3.7-management-alpine container_name: node3 hostname: node3 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5675:5672" - "15675:15672"
通过设置 hostname ,容器内部的 rabbitmq 的 nodename 就变成类似 rabbitmq@node1。同时集群中的 RabbitMQ 需要相同的 RABBITMQ_ERLANG_COOKIE 来进行互相认证。
启动服务:
docker-compose up -d
然后将 node2 , node3 加入 node1 ,注意,加入集群之前 RabbitMQ 必须停止:
# 停止 rabbitmq docker-compose exec node2 rabbitmqctl stop_app docker-compose exec node3 rabbitmqctl stop_app # 加入 node1 docker-compose exec node2 rabbitmqctl join_cluster rabbitmq@node1 docker-compose exec node3 rabbitmqctl join_cluster rabbitmq@node1 # 重新启动 docker-compose exec node2 rabbitmqctl start_app docker-compose exec node3 rabbitmqctl start_app
在任意一个节点上查询集群状态:
docker-compose exec node2 rabbitmqctl cluster_status
可以看到如下状态:
Cluster status of node rabbit@node2 ... [{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]}, {running_nodes,[rabbit@node3,rabbit@node1,rabbit@node2]}, {cluster_name,<<"rabbit@node2">>}, {partitions,[]}, {alarms,[{rabbit@node3,[]},{rabbit@node1,[]},{rabbit@node2,[]}]}]手动下线节点
将节点从在线状态下线, 首先停止节点,然后重置节点。
docker-compose exec node2 rabbitmqctl stop_app docker-compose exec node2 rabbitmqctl reset docker-compose exec node2 rabbitmqctl stop_app
在重新启动服务器之后可以发现该节点已经脱离了集群。
Cluster status of node rabbit@node2 ... [{nodes,[{disc,[rabbit@node2]}]}, {running_nodes,[rabbit@node2]}, {cluster_name,<<"rabbit@node2">>}, {partitions,[]}, {alarms,[{rabbit@node2,[]}]}]节点类型
RabbitMQ 的节点类型有两种,一种是 disc , 第二种是 ram。 RabbitMQ 要求集群中至少要有一个磁盘节点,储存了所有的元数据。当集群中的唯一一个磁盘节点崩溃后,集群可以继续收发消息,但是不能创建队列等操作。
RabbitMQ 在加入集群时默认为磁盘模式,如果要以内存模式加入:
docker-compose exec node2 rabbitmqctl join_cluster rabbit@node1 --ram
更改节点类型:
docker-compose exec node 2 rabbitmqctl change cluster_node_type descMirror Queue
RabbitMQ 提供了 Master/Slave 模式的 Mirror Queue 机制。请注意,开启 Publisher Confirmed 或者事务的情况下,只有所有的 Slave 都 ACK 之后才会返回 ACK 给客户端。
开启 Mirror Queue 主要通过设置 Policy 其中最主要的是 defination:
ha-mode: Mirror Queue 的模式
all : 默认的模式,表示在集群中的所有节点上进行镜像
exactly : 在指定数量的节点上进行镜像,数量由 ha-params 指定。
nodes : 在指定的节点上进行镜像,节点名称由 ha-params 指定。
ha-params : 如上所述
ha-sync-mode : 消息的同步模式
automatic : 当新的 Slave 加入集群之后会自动同步消息。
manual: 默认,当加入新的 Slave 之后不会自动把消息同步到新的 Slave 上。指导调用命令显式同步。
ha-promote-on-shutdown:
when-synced: 默认,如果主动停止 master ,那么 slave 不会自动接管。也就是说会期望 master 会重启启动,这可以保证消息不会丢失。
always: 不管 master 是因为什么原因停止的,slave 会立刻接管,有可能有一部分数据没有从 master 同步到 slave.
ha-promote-on-failure: 默认 always ,不推荐设置为 when-synced
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/75346.html
摘要:慕课网消息中间件极速入门与实战学习总结时间年月日星期三说明本文部分内容均来自慕课网。 慕课网《RabbitMQ消息中间件极速入门与实战》学习总结 时间:2018年09月05日星期三 说明:本文部分内容均来自慕课网。@慕课网:https://www.imooc.com 教学源码:无 学习源码:https://github.com/zccodere/s... 第一章:RabbitM...
摘要:基础教程注本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。安装库这里我们首先将消息推入队列,然后消费者从队列中去除消息进行消费。 RabbitMQ 基础教程(1) - Hello World 注:本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。 如果你喜欢我的文章,可以关注我的私人博客:http:...
摘要:添加应用启动类通过半自动刷新配置。配置客户端服务想要实现自动刷新配置的话,一端是不要做任何处理,只需要在一端处理即可。 SpringCloud(第 037 篇)通过bus/refresh半自动刷新ConfigClient配置 - 一、大致介绍 1、上章节我们讲到了手动刷新配置,但是我们假设如果微服务一多的话,那么我们是不是需要对每台服务进行手动刷新呢? 2、答案肯定是不需要的,我们也可...
摘要:消息丢失分成三种情况,可能出现生产者消费者。生产者丢失数据生产者丢失数据首先要确保写入的消息别丢,消息队列通过请求确认机制,保证消息的可靠传输。只有消息被持久化到磁盘以后,才会回传消息。消息丢失分成三种情况,可能出现生产者、RabbitMQ、消费者。 生产者丢失数据 首先要确保写入 RabbitMQ 的消息别丢,消息队列通过请求确认机制,保证消息的可靠传输。生产开启 comf...
阅读 822·2021-11-15 17:58
阅读 3611·2021-11-12 10:36
阅读 3753·2021-09-22 16:06
阅读 912·2021-09-10 10:50
阅读 1300·2019-08-30 11:19
阅读 3290·2019-08-29 16:26
阅读 905·2019-08-29 10:55
阅读 3317·2019-08-26 13:48