摘要:在中间的框是一个队列的消息缓冲区,保持代表的消费。本教程介绍,这是一个开放的通用的协议消息。我们将在本教程中使用,解决依赖管理。发送者将连接到,发送一条消息,然后退出。注意,这与发送发布的队列匹配。
介绍
RabbitMQ是一个消息代理器:它接受和转发消息。你可以把它当作一个邮局:当你把邮件放在信箱里时,你可以肯定邮差先生最终会把邮件送到你的收件人那里。在这个比喻中,RabbitMQ就是这里的邮箱,邮局和邮差。
RabbitMQ和邮局之间的主要区别是,它不处理纸张,而是接受、存储和转发二进制数据‒消息。
RabbitMQ,和一般的消息传递,使用专业术语。
生产者的工作就是发送消息。发送消息的程序是生产者:
队列类比一个邮箱,存在于RabbitMQ, 然而信息流通过RabbitMQ和您的应用程序,他们只能存储在一个队列。队列只受主机内存和磁盘限制的约束,它本质上是一个很大的消息缓冲区。会有许多生产者可以发送到一个队列的消息,许多消费者可以尝试从一个队列接收数据。这就是我们如何表示队列的方式:
消费者和生产者有着相似的意义. 消费者无非就是等待消息然后处理的程序:
请注意,生产者、消费者和代理不必同一主机上;事实上,在大多数应用程序中它们没有这样做。
"Hello World"(使用PHP amqplib客户端)
在本教程的这一部分中,我们将用PHP编写两个程序;一个生产者发送一条消息,一个用户接收消息并将它们打印出来。我们会PHP amqplib API的忽略一些细节,集中在这个非常简单的事情刚刚开始。这是一个“Hello World”的消息传递。
在下图中,“p”是我们的生产商,“C”是我们的消费者。在中间的框是一个队列的消息缓冲区,RabbitMQ保持代表的消费。
PHP amqplib客户端库RabbitMQ有很多协议。本教程介绍AMQP 0-9-1,这是一个开放的、通用的协议消息。有许多不同的语言RabbitMQ一批客户。我们将在本教程中使用PHP amqplib,composer解决依赖管理。
添加composer.json:
{ "require": { "php-amqplib/php-amqplib": ">=2.6.1" } }
composer install # 或者 直接运行包引入 composer require php-amqplib/php-amqplib现在我们可以开始我们的hello world
生产者(消息发送方)
我们命令我们的消息发布者(发送者)send.php和消息接收receive.php。发送者将连接到RabbitMQ,发送一条消息,然后退出。
require_once __DIR__ . "/vendor/autoload.php"; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage;
现在我们能创建一个连接服务器的Connection:
$connection = new AMQPStreamConnection("localhost", 5672, "guest", "guest"); $channel = $connection->channel();
该连接抽象套接字(socket)连接,并为我们负责协议版本协商和认证等。这里,我们连接到一个rabbitmq代理器在本地机器上-使用localhost。如果我们想在不同的机器上连接到一个代理,我们只需在这里指定它的名称或IP地址。
接下来,我们创建一个通道,这是处理事情的大部分API的地方。
发送消息前,我们必须声明一个队列为我们发送做准备;然后我们可以向队列发布消息:
$channel->queue_declare("hello", false, false, false, false); $msg = new AMQPMessage("Hello World!"); $channel->basic_publish($msg, "", "hello"); echo " [x] Sent "Hello World!" ";
声明队列是幂等的(原句:Declaring a queue is idempotent,这里的idempotent不知道是什么意思) - 只有在它不存在时才会创建队列。消息内容是一个字节数组,因此您可以在那里编码用你喜欢的方式。
最后,我们关闭通道和连接;
$channel->close(); $connection->close();
上面我们完成了send.php.
接下来我们完成消费方的代码
消费者(接收方,任务处理方)消费者从RabbitMQ接收推来的消息,我们会保持运行监听消息并打印出来。
引入lib
require_once __DIR__ . "/vendor/autoload.php"; use PhpAmqpLibConnectionAMQPStreamConnection;
设置与发布程序相同;我们打开一个连接和一个通道,并声明将要消耗的队列。注意,这与发送发布的队列匹配。
$connection = new AMQPStreamConnection("localhost", 5672, "guest", "guest"); $channel = $connection->channel(); $channel->queue_declare("hello", false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C", " ";
注意,我们也在这里声明队列。因为我们可能在发布之前启动消费者,我们希望在我们尝试从它那里消费消息之前确定队列的存在。
我们将告诉服务器从队列中发送消息。我们将定义一个PHP可调用,它将接收服务器发送的消息。请记住,消息是从服务器异步发送到客户机的。
$callback = function($msg) { echo " [x] Received ", $msg->body, " "; }; $channel->basic_consume("hello", "", false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); }
当调用basic_consume,我们的代码会阻塞。当我们收到消息时,我们的回调函数将通过接收到返回的消息传递。
以上是我们receive.php的代码
运行测试 运行消费者php receive.php运行消息发送方
php send.php列出队列
rabbitmqctl list_queues完整源码(调整过) config.php
[ "path" => dirname(dirname(__DIR__)) . "/vendor" ], "rabbitmq" => [ "host" => "127.0.0.1", "port" => "5672", "login" => "qkl", "password" => "123456", "vhost" => "/" ] ]; ?>receive.php
channel(); $channel->queue_declare("hello", false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C", " "; $callback = function($msg) { echo " [x] Received ", $msg->body, " "; }; $channel->basic_consume("hello", "", false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); ?>send.php
channel(); //发送方其实不需要设置队列, 不过对于持久化有关,建议执行该行 $channel->queue_declare("hello", false, false, false, false); $msg = new AMQPMessage("Hello World!"); $channel->basic_publish($msg, "", "hello"); echo " [x] Sent "Hello World!" "; $channel->close(); $connection->close(); ?>
了解如何构建一个简单的工作队列, 你可以阅读下一章节: RabbitMQ+PHP 教程二(Work Queues)
翻译来自 RabbitMQ - RabbitMQ tutorial - "Hello World!"
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/26025.html
摘要:基础教程注本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。安装库这里我们首先将消息推入队列,然后消费者从队列中去除消息进行消费。 RabbitMQ 基础教程(1) - Hello World 注:本文是对众多博客的学习和总结,可能存在理解错误。请带着怀疑的眼光,同时如果有错误希望能指出。 如果你喜欢我的文章,可以关注我的私人博客:http:...
摘要:平均每个消费者将得到相同数量的消息。消息确认完成任务可能需要几秒钟。为了确保消息不会丢失,支持消息确认。没有任何消息超时当这个消费者中止了,将会重新分配消息时。这是因为只是调度消息时,消息进入队列。 showImg(https://segmentfault.com/img/bVXNuN?w=332&h=111); 介绍 在上一个 Hello World 教程中,我们编写了从指定队列发送...
摘要:进行插入操作的端称为队尾,进行删除操作的端称为对头。还有个专业术语要了解下生产者发送消息的应用程序被称为生产者。消费者接收消息的应用程序被称为消费者。参考链接下安装及操作常用命令译实战教程一你是否听说过或者使用过队列? 你是否听说过或者使用过消息队列? 你是否听说过或者使用过RabbitMQ? 提到这几个词,用过的人,也许觉得很简单,没用过的人,也许觉得很复杂,至少在我没使用消息队列之前,听...
摘要:每当我们收到一条消息,这个回调函数就被皮卡库调用。接下来,我们需要告诉这个特定的回调函数应该从我们的队列接收消息为了让这个命令成功,我们必须确保我们想要订阅的队列存在。生产者计划将在每次运行后停止欢呼我们能够通过发送我们的第一条消息。 源码:https://github.com/ltoddy/rabbitmq-tutorial 介绍 RabbitMQ是一个消息代理:它接受和转发消息。你...
摘要:我们将任务封装为消息并将其发送到队列。为了确保消息永不丢失,支持消息确认。没有任何消息超时当消费者死亡时,将重新传递消息。发生这种情况是因为只在消息进入队列时调度消息。这告诉一次不要向工作人员发送多个消息。 源码:https://github.com/ltoddy/rabbitmq-tutorial 工作队列 showImg(https://segmentfault.com/img/r...
阅读 1092·2021-11-25 09:43
阅读 1613·2021-09-13 10:25
阅读 2547·2021-09-09 11:38
阅读 3380·2021-09-07 10:14
阅读 1675·2019-08-30 15:52
阅读 597·2019-08-30 15:44
阅读 3524·2019-08-29 13:23
阅读 1930·2019-08-26 13:33