摘要:生产者只能把消息发到交换器。是否要追加到一个特殊的队列是否要追加到许多的队列或者丢掉这条消息这些规则被定义为交换类型。有一点很关键,向不存在的交换器发布消息是被禁止的。如果仍然没有队列绑定交换器,消息会丢失。
发布与订阅 (Publish/Subscribe)
在之前的章节中,我们创建了工作队列,之前的工作队列的假设是每个任务只被分发到一个worker。在这一节中,我们会做一些完全不一样的事--把一条消息发送给多个消费者,这个模式叫做“发布/订阅”(publish/subscribe)。
举个例子,我们要构建一个简易的日志系统。由两个程序组成---一个来发出日志消息,另一个接收并把消息显示出来。
在我们的日志系统当中,每一个正在运行的接收程序都会收到消息。这样,我们可以运行一个receiver并把log定向到磁盘,然后再跑一个receiver,看看它是否会在屏幕上显示日志。
事实上,被发布的消息会被广播到所有的receiver那里。
交换器(Exchanges)在之前的引导中,我们从一个队列中做了收发的操作。是时候介绍在Rabbit中的全部的消息模型了。
让我们先快速地回顾一下之前学习的,
producer 是一个发送消息的应用
queue 是一个存储消息的buffer
consumer 是一个接收消息的应用
RabbitMQ中,消息模型的核心思想是生产者绝不会把消息直接发到队列。实际上,生产者通常不知道一条消息是否已经被发送到任意一个队列中。
生产者只能把消息发到交换器。交换器是个简单的东西。一方面接收从生产者那边来的消息,另一方面把他们push到队列中。交换器一定要知道当它们接收到消息之后要如何处理。是否要追加到一个特殊的队列?是否要追加到许多的队列?或者丢掉这条消息?这些规则被定义为交换类型。
以下是可以使用的交换类型:direct, topic, header, fanout。我们介绍一下最后一个--fanout。让我们先创建一个fanout类型的交换器“logs”:
ch.assertExchange("logs", "fanout", {durable: false})
fanout类型的交换器非常简单,我们可以从单单从名字上猜测,它就是把它接收到的消息广播给所有已知的队列。这也就是我们的logger所需要的。
列出所有的交换器(Listing exchanges)
你可以使用rabbitmqctl
$sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.
在列表中,一些amq.*的交换器和一些默认的(未命名的),都是被默认创建的,但是可能是你用不到的
未命名的交换器(Nameless exchange)
在之前的章节中我们未提过交换器,但是我们仍然能够把消息传到队列中,这就是我们使用了默认的交换器,因为我们使用了空的字符串("")。
之前我们是这样发布一条消息的
ch.sendToQueue("hello", new Buffer("Hello World!"));
这里我们使用默认的或者未命名的交换器,如果第一个参数存在的话,消息会被路由到这个参数名的队列。
现在,我们可以使用我们定义好的交换器
ch.publish("logs", "", new Buffer("Hello World!"));
第二个参数为空的话代表我们不想把消息推到指定的队列,只是想发布到logs的交换器中。
临时队列 (Temporary queues)你还记得我们之前用的声明过的队列(hello 和 task_queue)吗?。能够指明一个队列的名字对我们来说是重要的--我们需要把workers指到相同的队列。
当你想要分享给消费者和生产者队列的时候,给队列起一个名字很重要。
但着不是我们logger这个程序需要的,我们想监听所有的log消息,不是一部分log消息。同样的,我们对正在流动的消息也感兴趣(not in the old ones).我们需要完成两件事情:
第一,不管我们什么时候连接Rabbit,都需要一个新的,空的队列。我们可以创建一个随机的队列名字,或者让服务器为我们随机选择一个队列名字。
第二,不管我们什么时候断开与消费者的连接,队列需要自动销毁。
在amqp.node的客户端中,当我们传入字符串的时候,可以创建一个带有名字的未持久化的队列
ch.assertQueue("", {exclusive: true});
这个方法返回一个带有随机名字的队列实例,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg。
当连接被断开的时候,这个队列会被销毁,因为我们在声明的时候{exclusive:true}
我们已经创建了一个fanout类型的交换器和一个队列,现在我们需要告诉交换器把消息发送给队列,队列与交换器之间的关系我们称之为绑定。
ch.bindQueue(queue_name, "logs", "");
现在开始,logs的交换器为追加消息到我们的队列
Listing bindings:
你可以列出已经存在的绑定关系,你应该猜到。rabbitmqctl list_bindings。
整合(Putting it all together)生产者的程序,用来发出log消息,和之前章节没有太多的不同,最重要的改变就是现在我们是把消息发布到我们的logs的交换器中,而不是之前的在未声明的情况下使用。发送的时候我们需要提供一个路由键,但是在fanout类型当中,这个可以忽略。下面是emit_log.js的代码
#!/usr/bin/env node var amqp = require("amqplib/callback_api"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var ex = "logs"; var msg = process.argv.slice(2).join(" ") || "Hello World!"; ch.assertExchange(ex, "fanout", {durable: false}); ch.publish(ex, "", new Buffer(msg)); console.log(" [x] Sent %s", msg); }); setTimeout(function() { conn.close(); process.exit(0) }, 500); });
(emit_log.js 源码)
正如你所见,在与交换器建立连接之后。有一点很关键,向不存在的交换器发布消息是被禁止的。
如果仍然没有队列绑定交换器,消息会丢失。但是对我们来说还好,如果仍然没有消费者监听,我们可以安全地丢弃这些消息。
receive_logs.js的代码
#!/usr/bin/env node var amqp = require("amqplib/callback_api"); amqp.connect("amqp://localhost", function(err, conn) { conn.createChannel(function(err, ch) { var ex = "logs"; ch.assertExchange(ex, "fanout", {durable: false}); ch.assertQueue("", {exclusive: true}, function(err, q) { console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue); ch.bindQueue(q.queue, ex, ""); ch.consume(q.queue, function(msg) { console.log(" [x] %s", msg.content.toString()); }, {noAck: true}); }); }); });
(receive_logs,js源码)
如果你想要保存log,你可以打开控制台输入
$ ./receive_logs.js > logs_from_rabbit.log
如果你想在屏幕上看到log,再打开一个控制台
$ ./receive_logs.js
当然,需要发出logs
$ ./emit_log.js
使用rabbitmqctl list_bindings,你可以确定刚才的代码确实创建了交换器和队列,有两个receive_logs.js的程序在运行。
$ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.
这个结果的简要解释:数据从logs交换器到两个服务器分配的队列。这也是我们想要的结果。
要如何监听一部分的消息?让我们移到下一章。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/88110.html
摘要:这样的消息分发机制称作轮询。在进程挂了之后,所有的未被确认的消息会被重新分发。忘记确认这是一个普遍的错误,丢失。为了使消息不会丢失,两件事情需要确保,我们需要持久化队列和消息。 工作队列 showImg(https://segmentfault.com/img/remote/1460000008229494?w=332&h=111); 在第一篇中,我们写了一个程序从已经声明的队列中收发...
摘要:一个表示编译器检测到一个无效的引用值。在实际情况中,往往是在获取一个未被赋值的引用时被抛出。任何一个函数上下文都有一个被称为活动对象的变量对象。没有找到的话,就会认为引用名没有基础值并抛出的错误。下没有下的属性仅存在于被启动的情况下。 和其他语言相比,javascript中的对于undefined的理解还是有点让人困惑的。特别是试着理解ReferenceErrors错误(x is no...
摘要:概述技术栈错误详情报警机器人经常有如下警告过程确定报错位置有日志就很好办首先看日志在哪里打的从三个地方入手我们自己的代码没有的代码从上下来没有的代码在容器中执行 bug概述 技术栈 nginx uwsgi bottle 错误详情 报警机器人经常有如下警告: 1 2018-xx-xxT06:59:03.038Z 660ece0ebaad admin/admin 14 - - Sock...
摘要:允许接收和转发消息。一个等待接收消息的程序是一个消费者。发送者会先连接到发送一条消息,然后退出。注意这里的是要和之前的名称一致。翻译日期另因为想入门第一次想着翻译,第一次然后希望多多提出不足。 gitBook https://joursion.gitbooks.io/... Title: RabbitMQ tutorials ---- Hello World (Javascript) ...
阅读 2434·2021-11-18 10:02
阅读 694·2021-10-08 10:04
阅读 2268·2021-09-03 10:51
阅读 3551·2019-08-30 15:44
阅读 2807·2019-08-29 14:09
阅读 2472·2019-08-29 12:21
阅读 2068·2019-08-26 13:45
阅读 1811·2019-08-26 13:25