摘要:路由模式在之前的文章中我们建立了一个简单的日志系统。更形象的表示,如对中的感兴趣。为了进行说明,像下图这么来设置如图,可以看到有两个绑到了类型为的上。如图的设置中,一个为的就会同时发送到和。接收程序可以选择要接收日志的严重性级别。
路由模式
在之前的文章中我们建立了一个简单的日志系统。我们可以通过这个系统将日志message广播给很多接收者。
在本篇文章中,我们在这之上,添加一个新的功能,即允许接收者订阅message的一个子集。举个例子,我们将日志分成多个级别,一个接收者接收错误日志将之保存到磁盘,另一个接收者接收所有日志将之打印到控制台。
Bindings在前面的章节中,我们已经接触过binding了,像下面的代码这样:
channel.queueBind(queueName,EXCHANGE_NAME,"");
binding将exchange和queue关联在了一起。更形象的表示,如:queue对exchange中的message感兴趣。
bindings可以携带一个routingKey参数。为了避免和basic_publish的参数弄混,我们称之它为binding_key.我们像下面这样创建一个binding
channel.queueBind(queueName,EXCHANGE_NAME,"black");
binding key的作用要看exchange的类型,对于fanout类型的exchange,binding key是直接忽略的。
Direct Exchange在之前的日志系统中,message会推送到所有的消费者去。我们想让系统依据message的日志级别进行过滤。比如一个消费者只接收严重级别的日志。
fanout无法帮我们实现这样的功能,它只是无脑的进行广播。
我们使用direct类型的exchange,它的路由算法是非常简单的 - 只要message的routing_key和bind的binding_key相同即进行转发。
为了进行说明,像下图这么来设置
如图,可以看到有两个queue绑到了类型为direct的exchange上。第一个queue绑定用了orange这个binding key,第二个则用了black和green两个binding key。
那么结果就是有routing key为orange的message路由到了Q1.而routing key为black和green的message则路由到了Q2,其他的消息则被丢弃了。
Multiple Bindings
若使用相同的binding key将多个queue绑定到exchange上,就和fanout的行为一样了,message会广播到binding key相同的queue去。如图的设置中,一个routing key为black的message就会同时发送到Q1和Q2。
我们将在我们的日志系统上应用这个模型,使用direct类型的exchange去替代fanout类型的exchange。提供日志的严重性作为routing key。接收程序可以选择要接收日志的严重性级别。
首先我们创建exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
然后就是发送message
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
我们先假设severity取值 info | warning | error
Subscribing接收message和上一章没什么区别,只是需要给各个severity创建新的binding。
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }开始执行
EmitLogDirect.java代码如下
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent "" + severity + "":"" + message + """); } } //.. }
ReceiveLogsDirect.java代码如下:
import com.rabbitmq.client.*; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for (String severity : argv) { channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + delivery.getEnvelope().getRoutingKey() + "":"" + message + """); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
编译代码
javac -cp $CP ReceiveLogsDirect.java EmitLogDirect.java
如果想把warning和error的日志保存到文件去,那么
java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
如果想把所有的日志打印到控制台,那么
java -cp $CP ReceiveLogsDirect info warning error
发送error日志
java -cp $CP EmitLogDirect error "Run.Run. Or it will explode"
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/73892.html
摘要:主题模式在上一章我们改进了我们的日志系统,如果使用我们只能简单进行广播,而使用则允许消费者可以进行一定程度的选择。为的会同时发布到这两个。当为时,会接收所有的。当中没有使用通配符和时,的行为和一致。 主题模式 在上一章我们改进了我们的日志系统,如果使用fanout我们只能简单进行广播,而使用direct则允许消费者可以进行一定程度的选择。但是direct还是有其局限性,其路由不支持多个...
摘要:发布订阅模式在之前的文章里,创建了。我们称之为发布订阅模式。其实我们是用到了默认的,用空字符串来标识。空字符串代表了没有名字的被路由到了由指定名字的。和这种关系的建立我们称之为从现在开始这个就会将推向我们的队列了。 发布订阅模式 在之前的文章里,创建了work queue。work queue中,每一个task都会派发给一个worker。在本章中,我们会完成完全不一样的事情 - 我们会...
摘要:每个消费者会得到平均数量的。为了确保不会丢失,采用确认机制。如果中断退出了关闭了,关闭了,或是连接丢失了而没有发送,会认为该消息没有完整的执行,会将该消息重新入队。该消息会被发送给其他的。当消费者中断退出,会重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我们写了通过一个...
摘要:如果涉及返回值,就要用到本章提到的了。方法发送请求,并阻塞知道结果返回。当有消息时,进行计算并通过指定的发送给客户端。当接收到,则检查。如果和之前的匹配,则将消息返回给应用进行处理。 RPC模式 在第二章中我们学习了如何使用Work模式在多个worker之间派发时间敏感的任务。这种情况是不涉及到返回值的,worker执行任务就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...
摘要:生产者只能把消息发到交换器。是否要追加到一个特殊的队列是否要追加到许多的队列或者丢掉这条消息这些规则被定义为交换类型。有一点很关键,向不存在的交换器发布消息是被禁止的。如果仍然没有队列绑定交换器,消息会丢失。 发布与订阅 (Publish/Subscribe) 在之前的章节中,我们创建了工作队列,之前的工作队列的假设是每个任务只被分发到一个worker。在这一节中,我们会做一些完全不一...
阅读 1598·2021-11-11 10:59
阅读 2608·2021-09-04 16:40
阅读 3624·2021-09-04 16:40
阅读 2953·2021-07-30 15:30
阅读 1581·2021-07-26 22:03
阅读 3146·2019-08-30 13:20
阅读 2200·2019-08-29 18:31
阅读 420·2019-08-29 12:21