摘要:为了解决以上问题,我们需要使用的生产者确认模式。在这样的机制下,即使有一个消费者崩溃也不会丢失任何消息。即使处理一条消息会花费很长的时间。一些问题这个库提供了心跳检测的功能选项,但是没有做自动重连的。参考文章深入学习四的模式
数据的持久化
对于非常健壮稳定的后台系统,我们必须得考虑到各种宕机的情况:物理宕机,应用自身出错崩溃等,而这个时候我们的应用需要做到重启后数据依旧不丢失,这个问题就是数据持久化,也就是说数据持久化到了磁盘。
在RabbitMQ中,如果要保证消息发送到broker,我们首先需要做到三点
持久化的exchange(交换器):声明时开启durable选项
持久化的queue(队列):声明时开启durable选项
持久化的message:delivery_mode设置为2(php,python之类的库,2可以换成更友好的常量),在node的amqp.node库中是设置persistent为true
需要注意的一点是,持久化会造成性能损耗(写磁盘操作),但为了保证生产环境的数据一致性,我们必须这么做。
发送消息的confirm机制其实光光做到以上三点,数据依旧有丢失的可能,因为在客户端成功调用api存入消息之后,RabbitMQ还需要一段时间(很短,但不可忽略)才能落盘,RabbitMQ并不是为每条消息都做fsync的处理,可能仅仅保存到cache中而不是物理磁盘上,而在这段时间内RabbitMQ broker发生crash, 消息保存到cache但是还没来得及落盘,那么这些消息将会丢失。
为了解决以上问题,我们需要使用RabbitMQ的生产者确认模式。
为了开启确认模式,需要生产者将channel设置成confirm模式,一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号。
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息 (来自参考1)简单confirm示例
示例代码使用NodeJS实现,RabbitMQ服务可以使用上一篇RabbitMQ二三事的docker-compose.yml快速启动
const QUEUE_NAME = "test_queue" const config = require("./config") const amqp = require("amqplib") async function getMQConnection() { return await amqp.connect({ protocol: "amqp", hostname: config.MQ.host, port: config.MQ.port, username: config.MQ.user, password: config.MQ.pass, locale: "en_US", frameMax: 0, heartbeat: 5, // 心跳 vhost: config.MQ.vhost, }) } async function run(rmqConn, msgArr) { try { const channel = await rmqConn.createConfirmChannel() // 开启confirm const exchangeName = `${QUEUE_NAME}_exchange` await channel.assertExchange(exchangeName, "direct", { durable: true, autoDelete: false }) // 不存在exchange就新建exchange await channel.assertQueue(QUEUE_NAME, {durable: true, autoDelete: false}) // 不存在queue就新建 await channel.bindQueue(QUEUE_NAME, exchangeName, QUEUE_NAME) // 绑定交换器 // queue name当routing key msgArr.forEach(str => { channel.publish(exchangeName, QUEUE_NAME, Buffer.from(str), { persistent: true, mandatory: true }) }) await channel.waitForConfirms() console.log("发送批量数据成功") await channel.close() } catch(err) { // do something with err console.log("发送批量数据失败:" + err.message) } } async function testSendBatchMsg() { const conn = await getMQConnection() await run(conn, [ "cat", "dog", "pig", "mouse", "mouse", "penguin" ]) await conn.close() } testSendBatchMsg()说明
assertExchange和assertQueue是保证交换器和队列一定存在,这里的exchange是简单的direct交换器
ConfirmChannel#publish方法不返回promise
现在我们需要考虑我们的消费者了,消费者也会遇到程序出错或者物理宕机问题,RabbitMQ官方也给出了一套解决方案,和confirm机制类似,就是ack机制(Message acknowledgment).
在ack机制中,消费者在自己处理完业务逻辑后,需要发送一个ack消息,然后broker才认为这条消息被正确消费,然后从内存和磁盘中移除掉它,只要没收到消费者的acknowledgment,broker就会一直保存着这条消息.如果一个消费者崩溃(断开了连接)却没有发送ack,broker会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。在这样的机制下,即使有一个消费者崩溃也不会丢失任何消息。
const QUEUE_NAME = "test_queue" const config = require("./config") const amqp = require("amqplib") async function getMQConnection() { return await amqp.connect({ protocol: "amqp", hostname: config.MQ.host, port: config.MQ.port, username: config.MQ.user, password: config.MQ.pass, locale: "en_US", frameMax: 0, heartbeat: 5, // 心跳 vhost: config.MQ.vhost, }) } async function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)) } async function start() { const mqConn = await getMQConnection() console.log("connecting RabbitMQ successfully!") const channel = await mqConn.createChannel() const exchangeName = `${QUEUE_NAME}_exchange` await channel.assertExchange(exchangeName, "direct", { durable: true, autoDelete: false }) await channel.assertQueue(QUEUE_NAME, {durable: true, autoDelete: false}) await channel.bindQueue(QUEUE_NAME, exchangeName, QUEUE_NAME) channel.consume(QUEUE_NAME, async function(msg) { console.log("Received msg: %s from %s", QUEUE_NAME, msg.content.toString()) console.log("consuming message...") try { await sleep(500) // 模拟消费消息 console.log("consuming ends") channel.ack(msg) // 消费成功,发送ack } catch(e) { console.log("consuming failed: " + e.message) channel.nack(msg) // 消费失败,发送nack } }, {noAck: false}) // ack } start()注意
自动ack是默认打开的,也就是说消息发送到消费者的时候就被自动ack了,而很多情况下,我们想要手动ack,所以我们需要显式设置autoAsk=false关闭这种机制(在示例中是noAck: false)
ack没有任何超时限制;只有当消费者断开时,broker才会重新投递。即使处理一条消息会花费很长的时间。
一些问题amqp.node这个库提供了心跳检测的功能(heartbeat选项),但是没有做自动重连的。
对于heartbeat的值,RabbitMQ官网有说明
Several years worth of feedback from the users and client library
maintainers suggest that values lower than 5 seconds are fairly likely
to cause false positives, and values of 1 second or lower are very
likely to do so. Values within the 5 to 20 seconds range are optimal
for most environments.
所以心跳不宜设置的太低(因为短暂的网络拥塞或者流控制),太低容易导致误报,根据经验5s-20s是比较合理的。
参考文章:
深入学习RabbitMQ(四):channel的confirm模式
when-publishes-are-confirmed
Channel-oriented API reference
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/31527.html
摘要:前端之前端之前言前言昨天学习了标记式语言,也就是无逻辑语言。今天学习,被称之为网页的化妆师。为前端页面的样式,由选择器作用域与样式块组成。年初,组织负责的工作组开始讨论第一版中没有涉及到的问题。其讨论结果组成了年月出版的规范第二版。前端之 CSS 前言 昨天学习了标记式语言,也就是无逻辑语言。了解了网页的骨架是什么构成的,了解了常用标签,两个指令以及转义字符;其中标签可以分为两大类: 一类...
摘要:初始状态对应二叉树结构将顶点与最后一个结点调换即将顶点与最后一个结点交换,然后将索引为止置。 showImg(https://segmentfault.com/img/bVbgOtL?w=1600&h=800); 本文首发于一世流云专栏:https://segmentfault.com/blog... 一、PriorityBlockingQueue简介 PriorityBlockin...
摘要:第一步安装因为是语言编写的,所以我们首先需要安装第二步安装官网提供的安装方式本人安装成功的方式第三步查看是否已经安装好了,能查到说明已经安装完成了。 第一步:安装Erlang 因为rabbitMQ是Erlang语言编写的,所以我们首先需要安装Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...
阅读 1608·2021-09-30 09:47
阅读 3618·2021-09-22 15:05
阅读 2849·2021-08-30 09:44
阅读 3627·2019-08-30 15:55
阅读 1378·2019-08-30 13:08
阅读 1334·2019-08-29 16:40
阅读 557·2019-08-29 12:45
阅读 1394·2019-08-29 11:25