摘要:发布订阅模式在之前的文章里,创建了。我们称之为发布订阅模式。其实我们是用到了默认的,用空字符串来标识。空字符串代表了没有名字的被路由到了由指定名字的。和这种关系的建立我们称之为从现在开始这个就会将推向我们的队列了。
发布订阅模式
在之前的文章里,创建了work queue。work queue中,每一个task都会派发给一个worker。在本章中,我们会完成完全不一样的事情 - 我们会派发一条message给多个消费者。我们称之为发布订阅模式。
为了更好来说明,我们将要构建一个简单的日志系统。会由两部分代码构成,第一部分来发送日志message,第二部分会接受并打印日志。
在我们的日志系统中,每一个接收程序都会收到日志message。这种方式下,我们可以运行一个接收程序将日志保存到磁盘,同时使用另外一个接收程序将日志打印到屏幕。
本质上来说,发布的日志message会广播到所有运行的接收者。
Exchanges在之前的章节我们通过queue收发message。现在开始介绍Rabbit中的full messaging model。
首先让我们快速的回忆一下之前的章节
producer是一个发送message的用户程序。
queue是保存message的缓冲区
consumer是接收message的用户程序
RabbitMQ的messaging model的核心思想是producer不会直接向queue发送message。实际上,很多时候producer也不知道message会发送到哪些queue。
这里,producer将message发送到exchange。exchange是一个非常简单的东西。一方面它从producer侧接收message,另一方面它把message推送到queue去。 exchange必须知道对接收到的message接着要去做什么。是转发到特定的queue?还是转发到多个queue?还是干脆丢弃掉。这个规则取决于定义时exchange的类型。
exchange有四种可选的类型:direct, topic, headers和fanout. 今天我们聚焦于最后一种-fanout。让我们创建一个fanout类型的exchange,命名为logs
channel.exchangeDeclare("logs","fanout");
fanout类型的exchange是非常简单的。可以从名字上大概猜出其用途,它广播所有的message到它所知道的queue去。这也正是日志应用所期望的。
列出所有的exhange,可以使用rabbitmqctl命令 sudo rabbitmqctl list_exchanges,在列表总会出现一些amq.* 的exchange,和默认的exchange。这些是默认自动创建的,我们不会使用到它们。没有名字的exchange。在之前的章节里我们没有提到过exchanges,我们直接将message发送到queue。其实我们是用到了默认的exchange,用空字符串”“来标识。回想一下,我们像下面这样发布message:
channel.basicPublish("","hello",null,message.getBytes()); 第一个参数就是exchange的名字。空字符串代表了没有名字的exchange:message被路由到了由routingKey指定名字的queue。
现在,我们可以向有名字的exchange发布message。
channel.basicPublish("logs","",null,message.getBytes());Temporary Queue
之前我们使用queue时都会指定名字,如hello和task_queue。给一个queue命名是很重要的,因为我们要给worker指出相同的queue。当需要在生产者和消费者间共享一个queue时,就必须给queue取好名字。
但是在我们日志应用中,情况却有所不同。我们需要接收到所有的log message。我们也关注当前流动的message。我们需要搞定2个事情。
首先,当连接到Rabbit时,我们需要一个全新的,空的queue。因此我们可以自己创建一个随意名字的queue,或是由服务器选择随意的queue名字,这当然是更好的选择。
其次,当我们断开接收者时,该queue可以被自动删除。
在java客户端中,当我们使用无参的queueDeclare()时,我们创建的是使用自动生成名字的一个不持久的,自动删除queue:
String queueName = channel.queueDeclare().getQueue();
可以通过这里来学习到exclusive标志和其他queue的相关属性。
这时queue就具有一个随机的名字,比如像amq.gen-JzTY20BRgKO-HjmUJj0wLg.
Bindings
我们已经创建了一个fanout exchange和queue.现在我们要设置exchange,让它把message发送到我们的queue。exchange和queue这种关系的建立我们称之为binding.
channel.queueBind(queueName,"logs","");
从现在开始logs这个exchange就会将message推向我们的队列了。
可以使用命令rabbitmqctl list_bindings 来列出当前所有的binding。开始执行
生产者程序,和之前章节的代码变化不大,主要的变化是我们将message发送到exchange而不是一个queue。你发现我们在发送的时候会填上一个routingKey,这个值在fanout类型的exchange中是被忽略的。下面是生产者EmitLog.java的代码
public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = argv.length < 1 ? "info: Hello World!" : String.join(" ", argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent "" + message + """); } } }
如你所见,在建立connection之后我们声明了exchange.这一步是必要的,发布Message到一个不存在的exchange是不允许的。
如果没有queue绑定到exchange的时候,发布的message是会丢失的,但在现在这个场景是OK的。下面是ReceiveLogs.java的代码:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + message + """); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
首先进行编译
javac -cp $CP EmitLog.java ReceiveLog.java
如果要把日志保存到文件,则
java -cp $CP ReceiveLogs > logs_from_rabbit.log
如果要在控制台看日志,在另一个终端
java -cp $CP ReceiveLogs
最后来发送日志
java -cp $CP EmitLog
使用rabbitmqctl list_bindings,来确认程序创建了我们在代码中指定的binding和queue. 运行两个ReceiveLogs程序,你会看到像下面的输出
sudo rabbitmqctl list_bindings # => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # => ...done.
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/73895.html
摘要:主题模式在上一章我们改进了我们的日志系统,如果使用我们只能简单进行广播,而使用则允许消费者可以进行一定程度的选择。为的会同时发布到这两个。当为时,会接收所有的。当中没有使用通配符和时,的行为和一致。 主题模式 在上一章我们改进了我们的日志系统,如果使用fanout我们只能简单进行广播,而使用direct则允许消费者可以进行一定程度的选择。但是direct还是有其局限性,其路由不支持多个...
摘要:路由模式在之前的文章中我们建立了一个简单的日志系统。更形象的表示,如对中的感兴趣。为了进行说明,像下图这么来设置如图,可以看到有两个绑到了类型为的上。如图的设置中,一个为的就会同时发送到和。接收程序可以选择要接收日志的严重性级别。 路由模式 在之前的文章中我们建立了一个简单的日志系统。我们可以通过这个系统将日志message广播给很多接收者。 在本篇文章中,我们在这之上,添加一个新的功...
摘要:生产者只能把消息发到交换器。是否要追加到一个特殊的队列是否要追加到许多的队列或者丢掉这条消息这些规则被定义为交换类型。有一点很关键,向不存在的交换器发布消息是被禁止的。如果仍然没有队列绑定交换器,消息会丢失。 发布与订阅 (Publish/Subscribe) 在之前的章节中,我们创建了工作队列,之前的工作队列的假设是每个任务只被分发到一个worker。在这一节中,我们会做一些完全不一...
摘要:如果涉及返回值,就要用到本章提到的了。方法发送请求,并阻塞知道结果返回。当有消息时,进行计算并通过指定的发送给客户端。当接收到,则检查。如果和之前的匹配,则将消息返回给应用进行处理。 RPC模式 在第二章中我们学习了如何使用Work模式在多个worker之间派发时间敏感的任务。这种情况是不涉及到返回值的,worker执行任务就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...
摘要:每个消费者会得到平均数量的。为了确保不会丢失,采用确认机制。如果中断退出了关闭了,关闭了,或是连接丢失了而没有发送,会认为该消息没有完整的执行,会将该消息重新入队。该消息会被发送给其他的。当消费者中断退出,会重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我们写了通过一个...
阅读 1643·2021-08-30 09:45
阅读 1710·2019-08-30 15:54
阅读 1155·2019-08-30 14:02
阅读 1886·2019-08-29 16:21
阅读 1557·2019-08-29 13:47
阅读 3124·2019-08-29 12:27
阅读 676·2019-08-29 11:01
阅读 2640·2019-08-26 14:04