摘要:基础教程注本文是对众多博客的学习和总结,可能存在理解错误。消息的应答现在存在这样一种场景,消费者取到消息,然后创建任务开始执行。如果处理失败,也就是没有收到应答,那么就将这条消息重新发送给该队列的其他消费者。造成了负载不均衡。
RabbitMQ 基础教程(2) - Work Queue
注:本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。
如果你喜欢我的文章,可以关注我的私人博客:http://blog-qeesung.rhcloud.com/
在上一篇文章 RabbitMQ 基础教程(1) - Hello World 中,我们已经简单的介绍了RabbitMQ以及如何发送和接收一个消息。接下来我们将继续深入RabbitMQ,研究一下消息队列(Work Queue)
消息队列消息的发布者发布一个消息到消息队列中,然后信息的消费者取出消息进行消费。
queue +-------------+ +--+--+--+--+--+--+ +-------------+ | producer |----->|m1|m2| ... | | |---->| consumer | +-------------+ +--+--+--+--+--+--+ +-------------+
但是实际情况往往比这个要复杂,假如我们有多个信息的发布者和多个信息的消费者,那RabbitMQ又将会是怎么工作呢?
+--------------+ +--------------+ | producer1 +- / | consumer1 | +--------------+ - queue /- +--------------+ +--------------+ - +---+---+---+----+ /- +--------------+ | producer2 +---->X|m1 |m2 |m3 |... |---->| consumer2 | +--------------+ /- +---+---+---+----+ - +--------------+ +--------------+ /- - +--------------+ | ... |/ | ... | +--------------+ +--------------+Round-robin 分发算法
RabbitMQ中,如果有多个消费者同时消费同一个消息队列,那么就通过Round-robin算法将消息队列中的消息均匀的分配给每一个消费者。
这个算法其实很简单,每收到一个新的消息,就将这个消息分发给上下一个消费者。比如上一个消费者是consumer-n,那么有新消息来的时候就将这个新的消息发布到consumer-n+1,以此类推,如果到了最后一个消费者,那么就又从第一个开始。即:consumer-index = (consumer-index + 1) mod consumer-number
为了演示,首先来做几项准备工作。
定义任务 task.js
/** * 创建一个任务 * @param taskName 任务名字 * @param costTime 任务话费的时间 * @param callback 任务结束以后的回调函数 * @constructor */ function Task(taskName ,costTime , callback){ if(typeof(costTime) !== "number") costTime = 0; // no delay there setTimeout(function () { console.log(taskName+" finished"); if(callback && typeof (callback) === "function") callback(); } , 1000*costTime); };
串行化的消息任务结构
任务发布者负责将该结构发布到队列中,然后消费者取出消息,新建任务开始执行。
{ taskName : "taskname", costTime : 1 }
创建任务消息 task-producer.js
var amqp = require("amqplib/callback_api"); // 连接上RabbitMQ服务器 amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; // 得到发送消息的数目,默认发送4个 var name; var cost; (function () { if(process.argv.length < 4 ) { console.error("ERROR : usage - node rabbit-producer"); process.exit(-1); } name = process.argv[2]; cost = +process.argv[3]; })(); // 新建队列,然后将队列中的消息持久化取消 ch.assertQueue(q, {durable: true}); // 将任务串行化存入Buffer中,并推入队列 ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true}); console.log(" [x] Sent "+name); setTimeout(function () { process.exit(0); },500); }); });
消费任务消息 task-consumer.js
var amqp = require("amqplib/callback_api"); var Task = require("./task.js"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 监听队列上面的消息 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); // 定义新的任务 new Task(obj.taskName,obj.costTime); }, {noAck: true}); }); });
现在开启两个消费者进程来等待消费tasks队列中的消息
# shell1 node task-consumer.js # shell2 node task-consumer.js
然后向队列中推入三个消息
# shell3 node task-producer.js task1 0 node task-producer.js task2 0 node task-producer.js task3 0
运行结果
# shell1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 task1 finished Get the task task3 task3 finished # shell2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished # 已经通过Round-robin算法将消息队列中的消息分配到连接的消费者中了.消息,队列持久化
细心的读者可能已经发现了我们在声明队列和发送消息的代码块中改动了一小部分的代码,那就是
// 声明队列 ch.assertQueue(q, {durable: true}); // 发送信息 ch.sendToQueue(q, new Buffer(JSON.stringify({taskName :name ,costTime :cost })),{persistent:true});
通过将队列的durable配置参数生命为true可以保证在RabbitMQ服务器退出或者异常终止的情况下不会丢失消息队列,注意这里只是不会丢失消息队列,并不是消息队列中没有被消费的消息不会丢失。
为了保证消息队列中的消息不会丢失,就需要在发送消息时指定persistent选项,这里并不能百分之百的保证消息不会丢失,因为从队列中有新的消息,到将队列中消息持久化到磁盘这一段时间之内是无法保证的。
消息的应答现在存在这样一种场景,消费者取到消息,然后创建任务开始执行。但是任务执行到一半就抛出异常,那么这个任务算是没有被成功执行的。
在我们之前的代码实现中,都是消息队列中有新的消息,马上就这个消息分配给消费者消费,不管消费者对消息处理结果如何,消息队列会马上将已经分配的消息从消息队列中删除。如果这个任务非常重要,或者一定要执行成功,那么一旦任务在执行过程中抛出异常,那么这个任务就再也找不回来了,这是非常可怕的事情。
还好在RabbitMQ中我们可以为已经分配的消息和消息队列之间创建一个应答关系:
如果消息处理成功,那么就发送一个答复给消息队列,告诉它:我已经成功处理消息,不再需要这条消息了,你可以删除了,于是消息队列就将已经应答的消息从消息队列中删除。
如果处理失败,也就是没有收到应答,那么就将这条消息重新发送给该队列的其他消费者。
要在消费者和消息队列之间建立这种应答关系我们只需要将channel的consume函数的noAck参数设成false就可以了。
ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); // 定义新的任务 new Task(obj.taskName,obj.costTime); }, {noAck: false}); // 这里设置成false
下面我们就模拟一下消息处理失败的场景:
var amqp = require("amqplib/callback_api"); var Task = require("./task.js"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 监听队列上面的消息 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); // 定义新的任务 new Task(obj.taskName,obj.costTime,function(){ if(obj.taskName === "task2") throw new Error("Test error"); else ch.ack(msg); }); // 如果是任务二,那么就抛出异常。 }, {noAck: false}); }); });
按照上面的脚本执行顺序,我们在执行一遍脚本: consumer2得到执行task2消息,然后马上抛出异常退出进行,然后消息队列再将这个消息分配给cosumer1,接着也执行失败了,退出进程,最终消息队列中将只会有一个task2的消息存在。
启动消费者等待消息
# shell1 开启消费者1 node rabbit-consumer.js # shell2 开启消费者2 node rabbit-consumer.js
创建消息
node rabbit-producer.js task1 0 node rabbit-producer.js task2 10 node rabbit-producer.js task3 0
我们能来看一下结果:
# shell2 消费者2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished # 消费者2执行任务2的时候抛出异常,task2将会重新发送给消费者1 ... throw new Error("Error test"); # shell1 消费者1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 task1 finished Get the task task3 task3 finished Get the task task2 # 消费者1接收到任何2 task2 finished ... throw new Error("Error test"); # 也抛出异常了
最终会在消息队列中剩下一条未消费的信息。
更加均衡的负载这里有一点需要注意,如果你将noAck选项设置成了false,那么如果消息处理成功,一定要进行应答,负责消息队列中的消息会越来越多,直到撑爆内存。
在上文中我们听到过消息队列通过Round-robin算法来将消息分配给消费者,但是这个分配过程是盲目的。比如现在有两个消费者,consumer1和consumer2,按照Round-robin算法就会将奇数编号的任务发配给consumer1,将偶数编号的任务分配给consumer2,但是这些任务恰好有一个特性,奇数编号的任务比较繁重,而偶数编号的任务就比较简单。
那么这就会造成一个问题,那就是consumer1会被累死,而consumer2会被闲死。造成了负载不均衡。要是每一个消息都被成功消费以后告诉消息队列,然后消息队列再将新的消息分配给空闲下来的消费者不就好了。
RabbitMQ中的确有这样的一个配置选项。那就是ch.prefetch(1);
我们现在就来模拟一下
var amqp = require("amqplib/callback_api"); var Task = require("./task.js"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var q = "tasks"; ch.assertQueue(q, {durable: true}); console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q); // 监听队列上面的消息 ch.prefetch(1); // 添加这一行 ch.consume(q, function(msg) { var obj = JSON.parse(msg.content.toString("utf8")); console.log("Get the task "+obj.taskName); new Task(obj.taskName,obj.costTime ,function () { ch.ack(msg); }); }, {noAck: false}); }); });
启动消费者等待消息
# shell1 开启消费者1 node rabbit-consumer.js # shell2 开启消费者2 node rabbit-consumer.js
创建消息
node rabbit-producer.js task1 0 node rabbit-producer.js task2 20 node rabbit-producer.js task3 0 node rabbit-producer.js task4 20
# shell1 开启消费者1 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task1 # 任务马上结束 task1 finished Get the task task3 # 任务马上结束 task3 finished Get the task task4 # 任务四被分配到consumer1中了 task4 finished # shell2 开启消费者2 [*] Waiting for messages in tasks. To exit press CTRL+C Get the task task2 task2 finished
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/86315.html
摘要:平均每个消费者将得到相同数量的消息。消息确认完成任务可能需要几秒钟。为了确保消息不会丢失,支持消息确认。没有任何消息超时当这个消费者中止了,将会重新分配消息时。这是因为只是调度消息时,消息进入队列。 showImg(https://segmentfault.com/img/bVXNuN?w=332&h=111); 介绍 在上一个 Hello World 教程中,我们编写了从指定队列发送...
摘要:每个消费者会得到平均数量的。为了确保不会丢失,采用确认机制。如果中断退出了关闭了,关闭了,或是连接丢失了而没有发送,会认为该消息没有完整的执行,会将该消息重新入队。该消息会被发送给其他的。当消费者中断退出,会重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我们写了通过一个...
摘要:在中间的框是一个队列的消息缓冲区,保持代表的消费。本教程介绍,这是一个开放的通用的协议消息。我们将在本教程中使用,解决依赖管理。发送者将连接到,发送一条消息,然后退出。注意,这与发送发布的队列匹配。 介绍 RabbitMQ是一个消息代理器:它接受和转发消息。你可以把它当作一个邮局:当你把邮件放在信箱里时,你可以肯定邮差先生最终会把邮件送到你的收件人那里。在这个比喻中,RabbitMQ就...
摘要:这样的消息分发机制称作轮询。在进程挂了之后,所有的未被确认的消息会被重新分发。忘记确认这是一个普遍的错误,丢失。为了使消息不会丢失,两件事情需要确保,我们需要持久化队列和消息。 工作队列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我们写了一个程序从已经声明的队列中收发...
摘要:消息持久化控制的属性就是消息的持久化。当生产者发送的消息路由键为时,两个消费者都会收到消息并处理当生产者发送的消息路由键为时,只有消费者可以接收到消息。八的消息确认机制在中,可以通过持久化数据解决服务器异常的数据丢失问题。 一、内容大纲&使用场景 1. 消息队列解决了什么问题? 异步处理 应用解耦 流量削锋 日志处理 ...... 2. rabbitMQ安装与配置 3. Java操...
阅读 3126·2021-10-08 10:04
阅读 1038·2021-09-30 09:48
阅读 3427·2021-09-22 10:53
阅读 1629·2021-09-10 11:22
阅读 1660·2021-09-06 15:00
阅读 2122·2019-08-30 15:56
阅读 685·2019-08-30 15:53
阅读 2220·2019-08-30 13:04