摘要:平均每个消费者将得到相同数量的消息。消息确认完成任务可能需要几秒钟。为了确保消息不会丢失,支持消息确认。没有任何消息超时当这个消费者中止了,将会重新分配消息时。这是因为只是调度消息时,消息进入队列。
介绍
在上一个 Hello World 教程中,我们编写了从指定队列发送和接收消息的程序。在这篇文章中,我们将创建一个工作队列,用于在多个工人(消费者)之间分配耗时的任务。
工作队列(又名任务队列)背后的主要思想是避免立即执行资源密集型任务,必须等待它完成。相反,我们计划稍后完成任务。我们将任务封装为消息并将其发送到队列中。后台运行的一个工作进程将弹出任务并最终执行该任务。当你运行许多工人(消费者)时,任务将在他们之间分担。
这个概念在Web应用程序中尤其有用,因为在短HTTP请求中不可能处理复杂的任务。
先决条件在本教程的前一部分,我们发送了一条包含“Hello World”的消息。现在,我们将发送支持复杂任务的字符串。我们没有一个真实环境的任务,如图像进行调整或PDF文件的渲染,让我们利用sleep()模拟真实环境的业务功能。我们将字符串中的点数作为其复杂度;每个点都将占“工作”的一秒钟。例如,由Hello...描述的一个伪任务…需要三秒。
new_task.php我们会稍微修改send.php代码从我们先前的例子,允许任意的消息是从命令行发送。这一计划将任务分配给我们的工作队列,所以我们命名它 new_task.php:
$data = implode(" ", array_slice($argv, 1)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, "", "hello"); echo " [x] Sent ", $data, " ";
我们的上一个版本的receive.php脚本也需要一些改变:它需要假第二工作在消息体中每一点。它会从队列弹出消息和执行任务,所以让我们把命名worker.php:
$callback = function($msg){ echo " [x] Received ", $msg->body, " "; //根据"."数量个数获取延迟时间,单位秒 sleep(substr_count($msg->body, ".")); //模拟业务执行时间延迟 echo " [x] Done", " "; }; $channel->basic_consume("hello", "", false, true, false, false, $callback);单worker简单运行测试 消费者
php worker.php消息生产者
php new_task.php "A very hard task which takes two seconds.."循环调度
一个使用任务队列的优点是容易并行工作的能力。如果我们积压了大量的工作,我们可以增加更多的工人,这样就可以轻松地规模化。
首先,让我们尝试同时运行两worker.php脚本。他们都会从队列中获得消息,看看效果如何?让我们看看。
你需要打开三个console命令。两将运行worker.php脚本。这些控制台将是我们的两个消费者C1和C2。
消费者1php worker.php消费者2
php worker.php消息生产者
php new_task.php msg1...
默认情况下,RabbitMQ将会发送的每一条消息给下一个消费者,在序列。平均每个消费者将得到相同数量的消息。这种分发消息的方式称为循环轮询。试着用三个或更多的工人。
消息确认完成任务可能需要几秒钟。你可能遇到如果一个消费者开始一个长期的任务,并且只完成了部分任务,那么会发生什么?。我们目前的代码,一旦RabbitMQ发送一个消息给客户立即标记为删除。在这种情况下,如果您中止一个消费者,我们将丢失它正在处理的消息。我们还将丢失发送给该消费者所有的尚未处理的消息。
如果我们不想失去任何任务。如果一个消费者意外中止了,我们希望把任务交给另一个消费者。
为了确保消息不会丢失,RabbitMQ支持消息确认。ACK(nowledgement)消费者返回的结果告诉RabbitMQ有一条消息收到,你可以自由可控的删除他
如果一个消费者中止了(其通道关闭,连接被关闭,或TCP连接丢失)不发送ACK,RabbitMQ将会理解这个消息并没有完全处理,将它重新加入队列。如果有其他用户同时在线,它就会快速地传递到另一个消费者。这样,即使意外中止了,也可以确保没有丢失信息。
没有任何消息超时;当这个消费者中止了,RabbitMQ将会重新分配消息时。即使处理消息花费很长很长时间也很好。
消息确认是默认关闭。可通过设置的第四个参数basic_consume设置为false(true意味着没有ACK)和从消费者发送合适的确认,一旦我们完成一个任务。
$callback = function($msg){ echo " [x] Received ", $msg->body, " "; sleep(substr_count($msg->body, ".")); echo " [x] Done", " "; $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]); }; $channel->basic_consume("task_queue", "", false, false, false, false, $callback);
使用此代码,我们可以确信,即使在处理消息时使用Ctrl + C杀死一名消费者,也不会丢失任何东西。消费者中止都未确认的消息后很快会被重新分配。
忘了确认(Forgotten acknowledgment)丢失ACK确认是一个常见的错误。这是一个容易犯的错误,但后果很严重。当你的客户退出,消息会被重新分配(这可能看起来像是随机的分配),RabbitMQ将会消耗更多的内存,它不会释放任何延迟确认消息。
为了调试这种错误,你可以使用rabbitmqctl打印messages_unacknowledged字段:
rabbitmqctl list_queues name messages_ready messages_unacknowledged消息持久化(Message durability)
我们已经学会了如何确保即使消费者死了,任务也不会丢失。但是如果RabbitMQ服务器停止了,我们的任务仍然有可能会丢失。
当RabbitMQ退出或崩溃了,会丢失队列和消息除非你不要。要确保消息不会丢失,需要两件事:我们需要将队列和消息都标记为持久的。
首先,我们需要确保RabbitMQ永远不会丢失队列。为了做到这一点,我们需要声明它是持久的。为此我们通过queue_declare作为第三参数为true:
$channel->queue_declare("hello", false, true, false, false);
虽然这个命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,该队列不持久。RabbitMQ不允许你重新定义现有队列用不同的参数,将返回一个错误的任何程序,试图这么做。但有一个快速的解决方法-让我们声明一个名称不同的队列,例如task_queue:
$channel->queue_declare("task_queue", false, true, false, false);
需要应用到生产者和消费者代码中设置为true。
在这一点上,我们可以确保即使RabbitMQ重启了,task_queue队列不会丢失。现在我们要标记我们的消息持续通过设置delivery_mode = 2消息属性,amqpmessage作为属性数组的一部分。
$msg = new AMQPMessage($data, array("delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT) );关于消息持久性的说明(Note on message persistence)
将消息标记为持久性不能完全保证消息不会丢失。虽然它告诉RabbitMQ保存信息到磁盘上,还有一个短的时间窗口时,RabbitMQ 已经接受信息并没有保存它。另外,RabbitMQ不做fsync(2)每一个消息--它可能只是保存到缓存并没有真正写入到磁盘。持久性保证不强,但对于我们的简单任务队列来说已经足够了。如果你需要更强的保证,那么你可以使用消费者确认。
公平调度您可能已经注意到,调度仍然不完全按照我们的要求工作。例如,在一个有两个消费者的情况下,当所有的奇数信息都很重,甚至很轻的消息,一个消费者会一直忙,而另一个消费者几乎不做任何工作。嗯,RabbitMQ不知道发生了什么事,仍将均匀消息发送。
这是因为RabbitMQ只是调度消息时,消息进入队列。当存在未确认的消息时。它只是盲目的分发n-th条消息给n-th个消费者。
为了改变这个分配方式,我们可以调用basic_qos方法,设置参数prefetch_count = 1。这告诉RabbitMQ不要在一个时间给一个消费者多个消息。或者,换句话说,在处理和确认以前的消息之前,不要向消费者发送新消息。相反,它将发送给下一个仍然不忙的消费者。
$channel->basic_qos(null, 1, null);
关于队列大小的注释(Note about queue size)源码 new_task.php如果所有的消费者都很忙,你的队列填满了。你会想留意到这一点,也许增加更多的工人,或者有其他的策略。
channel(); $channel->queue_declare("task_queue", false, true, false, false); $data = implode(" ", array_slice($argv, 1)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data, array("delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, "", "task_queue"); echo " [x] Sent ", $data, " "; $channel->close(); $connection->close(); ?>worker.php
channel(); $channel->queue_declare("task_queue", false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C", " "; $callback = function($msg){ echo " [x] Received ", $msg->body, " "; sleep(substr_count($msg->body, ".")); echo " [x] Done", " "; $msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]); }; $channel->basic_qos(null, 1, null); $channel->basic_consume("task_queue", "", false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>
使用消息的确认和预取,你可以设置一个工作队列。耐久性的配置选项让任务存在,即使RabbitMQ重启。
学习如何向许多消费者传递同样的信息, 你可以阅读下一章节:RabbitMQ+PHP 教程三(Publish/Subscribe)。
翻译来自 RabbitMQ - RabbitMQ tutorial - Work Queues
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/26023.html
摘要:在中间的框是一个队列的消息缓冲区,保持代表的消费。本教程介绍,这是一个开放的通用的协议消息。我们将在本教程中使用,解决依赖管理。发送者将连接到,发送一条消息,然后退出。注意,这与发送发布的队列匹配。 介绍 RabbitMQ是一个消息代理器:它接受和转发消息。你可以把它当作一个邮局:当你把邮件放在信箱里时,你可以肯定邮差先生最终会把邮件送到你的收件人那里。在这个比喻中,RabbitMQ就...
摘要:每个消费者会得到平均数量的。为了确保不会丢失,采用确认机制。如果中断退出了关闭了,关闭了,或是连接丢失了而没有发送,会认为该消息没有完整的执行,会将该消息重新入队。该消息会被发送给其他的。当消费者中断退出,会重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我们写了通过一个...
摘要:参考文档依赖包安装环境配置环境变量增加内容保存退出,并刷新变量测试是否安装成功安装完成以后,执行看是否能打开,用退出,注意后面的点号,那是的结束符。 参考文档:http://www.cnblogs.com/phpinfo/p/4104551...http://blog.csdn.net/historyasamirror/ar... 依赖包安装 yum install ncurses-d...
摘要:在客户端中,当我们将队列名称作为空字符串提供时,我们创建一个带有生成名称的非持久队列方法返回时,变量包含一个随机生成的队列名称。交换和队列之间的关系称为绑定。 使用 php-amqplib 介绍 在前面的教程中,我们创建了一个工作队列。工作队列背后的假设是每个任务都交付给一个工作人员处理。在这一部分中,我们将做一些完全不同的事情——我们将向多个消费者发送消息。此模式称为发布/订阅。 ...
摘要:这样的消息分发机制称作轮询。在进程挂了之后,所有的未被确认的消息会被重新分发。忘记确认这是一个普遍的错误,丢失。为了使消息不会丢失,两件事情需要确保,我们需要持久化队列和消息。 工作队列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我们写了一个程序从已经声明的队列中收发...
阅读 961·2023-04-26 02:49
阅读 1171·2021-11-25 09:43
阅读 2541·2021-11-18 10:02
阅读 2919·2021-10-18 13:32
阅读 1280·2019-08-30 13:54
阅读 2073·2019-08-30 12:58
阅读 3007·2019-08-29 14:06
阅读 2153·2019-08-28 18:10