摘要:这样的消息分发机制称作轮询。在进程挂了之后,所有的未被确认的消息会被重新分发。忘记确认这是一个普遍的错误,丢失。为了使消息不会丢失,两件事情需要确保,我们需要持久化队列和消息。
工作队列
在第一篇中,我们写了一个程序从已经声明的队列中收发消息,在这篇中,我们会创建一个工作队列(Work Queue)来分发works里面的耗时任务。
其主要思想就是避免立即执行耗资源的任务,并等待它完成。相反的,我们要让这些任务在稍后的一个时间执行。我们把任务封装成一个消息放到队列中。
一个工作进程会在后台执行,取出(Pop)任务并最终会完成这项任务,当你运行多个work的时候,这些任务会在它们之间共享。
这个概念在web应用中也是非常有用的,当在一个http请求窗口中不可能完成一个复杂的任务时候。
准备在之前的引导中,我们发送了一个’Hello World‘的消息。现在我们要发送一个字符串代表一个复杂的任务,我们没有像调整
图片大小或者渲染一个pdf文件这样的在真实场景中的任务,所以我们使用setTimeout来模拟我们正处于忙碌状态。我们把‘.’的数量代表这个字符串的复杂度;
每一个‘." 会消耗一秒钟,例:一个模拟的任务"Hello..." 会消耗三秒钟。
从之前的例子,我们稍稍修改一下send.js 的代码,允许命令行可以发送任意的消息。这个程序在工作队列中安排好任务,所以我们称它new_task.js
var q = "task_queue"; var msg = process.argv.slice(2).join(" ") || "Hello World!"; ch.assertQueue(q, {durable: true}); ch.sendToQueue(q, new Buffer(msg), {persistent: true}); console.log(" [x] Sent "%s"", msg);
我们的之前的receive.js同样需要一些改变,需要对消息内容中的每个"."模拟成一个会消耗一秒的任务。它要从队列中取出一条消息并执行这个任务,我们把它称作worker.js
ch.consume(q, function(msg) { var secs = msg.content.toString().split(".").length - 1; console.log(" [x] Received %s", msg.content.toString()); setTimeout(function() { console.log(" [x] Done"); }, secs * 1000); }, {noAck: true});
注意我们模拟的执行时间
执行我们的程序
shell1$ ./worker.js shell2$ ./new_task.js循环调度
使用任务队列(Task Queue)的其中的一个优势是有简化并行工作的能力。如果我们有很多堆积的未完成的任务,我们只需添加更多的worker来进行扩展。
首先,我们尝试同时启动两个worker.js,他们都会从队列中受到消息,但是实际上呢?我们来看看
你需要打开第三个命令行,两个来运行worker.js脚本,我们称作C1,C2
shell1$ ./worker.js [*] Waiting for messages. To exit press CTRL+C
shell2$ ./worker.js [*] Waiting for messages. To exit press CTRL+C
在第三个命令行工具中,我们会发布新的任务,一旦你启动消费者,你可以发布一些消息:
shell3$ ./new_task.js First message. shell3$ ./new_task.js Second message.. shell3$ ./new_task.js Third message... shell3$ ./new_task.js Fourth message.... shell3$ ./new_task.js Fifth message.....
让我们看看什么被分发到我们的worker
shell1$ ./worker.js [*] Waiting for messages. To exit press CTRL+C [x] Received "First message." [x] Received "Third message..." [x] Received "Fifth message....."
shell2$ ./worker.js [*] Waiting for messages. To exit press CTRL+C [x] Received "Second message.." [x] Received "Fourth message...."
默认情况下,RabbitMQ会依次地把消息推送到下一个消费者,平均每个消费者会得到相同数量的消息。这样的消息分发机制称作轮询。可以尝试3个或更多的worker。
## 消息确认 (Message acknowledgment)
要完成一个任务需要一些事件,你可能会想,当一个消费者开始执行一个长的任务但只执行一部分就die了会发生什么。就我们当前的代码,一旦RabbitMQ 分发了一条消息到消费者那边,就会立即从存储中移除这条消息。这样的话,如果你杀掉了进程,我们将会丢失这条正在被处理的消息。
我们也同样丢失了我们发送给这个进程的但还没被处理的消息。
但是我们不想丢失任何的任务,如果一个进程挂掉,我们希望这个任务会被分发到其他的进程。
为了确保每一条消息绝不会丢失,RabbitMQ支持 消息确认,一个ack标志会从消费者那边返回去通知RabbitMQ当前的这个消息已经收到并且已经完成,于是RabbitMQ就可以取删掉这个任务了。
如果一个消费者挂了(通道被关闭,连接关闭,或者TCP连接丢失)而没有发送ack标志,RabbitMQ会明白这条任务还没被执行完,并会重新放回队列中,如果当时有其他的消费者在线,这个消息会被快速地发送给其他的消费者。这样的话你就可以保证没有消息会遗失,即使进程只是偶尔会挂掉。
不管消息处理是否超时,RabbitMQ只会在消费者挂掉的时候重新分发消息。这对于那些要处理很久很久的消息也是好的(add:不会被判定为noack,而重新分发)
在之前的例子中,消息确认是被关闭的,是时候打开它了,使用{noAck: false}(你也可以移除这个操作选项)选项,当我们完成这个任务的时候发送一个正确的消息确认。
ch.consume(q, function(msg) { var secs = msg.content.toString().split(".").length - 1; console.log(" [x] Received %s", msg.content.toString()); setTimeout(function() { console.log(" [x] Done"); ch.ack(msg); }, secs * 1000); }, {noAck: false});
使用这样的代码,你可以确定即使在它还在处理消息的时候你使用CTRL+C 杀掉进程也不会有数据丢失。在进程挂了之后,所有的未被确认的消息会被重新分发。
## 忘记确认 这是一个普遍的错误,丢失ack。只是一个简单的错误,但结果确实很严重的。当客户端停止的时候,消息会被重新分发(像是被随机分发),但是RabbitMQ会占用越来越多的内存当它不能取释放掉任何未被确认的消息。 为了调试这种类型的错误,你可以使用`rabbitmqctl`来输出未被确认的消息字段: $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.消息持久化
我们学习了确保在进程挂掉仍保证任务不会被丢失,但我们的任务还是会在RabbitMQ服务停止的时候丢失。
当RabbitMQ 退出或者崩溃,除非你叫它不要丢失,不然队列和消息都会丢失。为了使消息不会丢失,两件事情需要确保,我们需要持久化队列和消息。
首先,我们要让RabbitMQ 不会丢失队列,为此,我们要先声明
ch.assertQueue("hello", {durable: true});
尽管这样的操作是对的,但是在我们现在的配置中是不起作用的,这是因为我们已经定义了一个未持久化的叫做hello的队列,RabbitMQ不允许你改变一个已经存在的队列的参数,如果你这样做,程序将会返回错误。
但是有一个快速的办法 --- 让我们定义一个新的队列,叫做task_queue
ch.assertQueue("task_queue", {durable: true});
这个durable选项,需要消费者和生产者都去使用。
此时我们能保证task_queue队列不会在RabbitMQ重启的时候丢失,现在我们需要对消息进行持久化 --- 使用presistent的Channel.sendToQueue选项,
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
注意:消息持久化 消息持久化,不能完全地保证消息不会丢失,尽管它告诉RabbitMQ要把消息存到磁盘当中,总存在一个RabbitMQ接收到消息,但还未处理完的情况。另外,RabbitMQ并不是对每个消息做到帧同步,有可能只是被写到缓存中,还没被写到磁盘。 消息持久化不能完全保证,但已经远远满足我们的简单的工作队列的需求,如果你需要更强的持久化的保证,你可以使用[publisher confirms](https://www.rabbitmq.com/confirms.html)。均衡调度(Fair dispatch)
你可能已经注意到现在的调度并不是我们想要的,例:在有两个worker的情况下,当所有的奇数消息都是重的而偶数消息是轻量的,那会有一个worker会一直处于忙碌状态,而另一个worker几乎不工作,
RabbitMQ,并不知道这些情况,只知道持续地均匀地分发消息。
这样发生的原因是RabbitMQ只是在消息进入队列的时候进行分发的工作,不管消费者的未确认的消息的数量,只是一味地分发第N条消息给第N个消费者。
为了解决这样的问题,我们使用方法prefetch,并设置值为1,表示RabbitMQ不会同时给一个worker超过一条消息,即,不会分发一条新的消息直到worker完成并且发送ack标志。否则,RabbitMQ会把消息发送给下一个不在忙碌状态的worker.
ch.prefetch(1);
注意队列的大小 如果所有的worker都处于忙碌状态,你的队列可以被填满,你可能需要一个监控,或者添加更多的worker,或者有其他的解决方案。整合
最后的new_task.js的代码:
#!/usr/bin/env node var amqp = require("amqplib/callback_api"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "task_queue"; var msg = process.argv.slice(2).join(" ") || "Hello World!"; ch.assertQueue(q, {durable: true}); ch.sendToQueue(q, new Buffer(msg), {persistent: true}); console.log(" [x] Sent "%s"", msg); }); setTimeout(function() { conn.close(); process.exit(0) }, 500); });
new_task.js source
worker.js的代码:
#!/usr/bin/env node var amqp = require("amqplib/callback_api"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "task_queue"; ch.assertQueue(q, {durable: true}); ch.prefetch(1); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); ch.consume(q, function(msg) { var secs = msg.content.toString().split(".").length - 1; console.log(" [x] Received %s", msg.content.toString()); setTimeout(function() { console.log(" [x] Done"); ch.ack(msg); }, secs * 1000); }, {noAck: false}); }); });
worker.js source
使用消息确认和预处理,你可以建立一个工作队列。持久化选项使得消息可以在RabbitMQ会重启的情况下得以保留。
获得更多的关于Channel的方法和消息的属性,你可以浏览amqplib docs
翻译:Joursion
日期 :2016/12/25
欢迎交流,学习。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/88111.html
摘要:生产者只能把消息发到交换器。是否要追加到一个特殊的队列是否要追加到许多的队列或者丢掉这条消息这些规则被定义为交换类型。有一点很关键,向不存在的交换器发布消息是被禁止的。如果仍然没有队列绑定交换器,消息会丢失。 发布与订阅 (Publish/Subscribe) 在之前的章节中,我们创建了工作队列,之前的工作队列的假设是每个任务只被分发到一个worker。在这一节中,我们会做一些完全不一...
摘要:允许接收和转发消息。一个等待接收消息的程序是一个消费者。发送者会先连接到发送一条消息,然后退出。注意这里的是要和之前的名称一致。翻译日期另因为想入门第一次想着翻译,第一次然后希望多多提出不足。 gitBook https://joursion.gitbooks.io/... Title: RabbitMQ tutorials ---- Hello World (Javascript) ...
摘要:每个消费者会得到平均数量的。为了确保不会丢失,采用确认机制。如果中断退出了关闭了,关闭了,或是连接丢失了而没有发送,会认为该消息没有完整的执行,会将该消息重新入队。该消息会被发送给其他的。当消费者中断退出,会重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我们写了通过一个...
摘要:发布订阅模式在之前的文章里,创建了。我们称之为发布订阅模式。其实我们是用到了默认的,用空字符串来标识。空字符串代表了没有名字的被路由到了由指定名字的。和这种关系的建立我们称之为从现在开始这个就会将推向我们的队列了。 发布订阅模式 在之前的文章里,创建了work queue。work queue中,每一个task都会派发给一个worker。在本章中,我们会完成完全不一样的事情 - 我们会...
摘要:平均每个消费者将得到相同数量的消息。消息确认完成任务可能需要几秒钟。为了确保消息不会丢失,支持消息确认。没有任何消息超时当这个消费者中止了,将会重新分配消息时。这是因为只是调度消息时,消息进入队列。 showImg(https://segmentfault.com/img/bVXNuN?w=332&h=111); 介绍 在上一个 Hello World 教程中,我们编写了从指定队列发送...
阅读 3087·2023-04-25 15:02
阅读 2726·2021-11-23 09:51
阅读 1982·2021-09-27 13:47
阅读 1943·2021-09-13 10:33
阅读 883·2019-08-30 15:54
阅读 2615·2019-08-30 15:53
阅读 2818·2019-08-29 13:58
阅读 824·2019-08-29 13:54