摘要:主题模式在上一章我们改进了我们的日志系统,如果使用我们只能简单进行广播,而使用则允许消费者可以进行一定程度的选择。为的会同时发布到这两个。当为时,会接收所有的。当中没有使用通配符和时,的行为和一致。
主题模式
在上一章我们改进了我们的日志系统,如果使用fanout我们只能简单进行广播,而使用direct则允许消费者可以进行一定程度的选择。但是direct还是有其局限性,其路由不支持多个条件。
在我们的日志系统中,消费者程序可能不止是基于日志的severity,同时也想基于发送日志的源系统。你可能知道linux的syslog工具,它就是同时基于severity(info/warn/crit...)和功能(auth/cron/kern...).
这就提供了很大的灵活性-我们想接收来自cron的严重错误日志和kern的所有日志。
下面我们就使用更复杂的topic来改进我们的日志系统。
Topic exchange发送到topic类型exchange的message不可以具有模糊的routing_key,它必须具有以冒号分割的词。就像"stock.usd.nyse","nyse.vmw","quick.orange.rabbit"等,限制长度255字节。
binding key也采用相似的形势。topic exchange的逻辑和direct相似,通过比较message的routing key和bind的binding key,来匹配转发的queue。但是topic的binding支持通配符:
” * “表示任何一个词
” # “ 表示0或1个词
通过上面图示的场景来解释会比较好理解。
例子中我们将发送描述动物的message。message会携带routing key(包含三个词),第一个词表示speed,第二个表示color,第三个表示species"
创建了三个绑定:Q1的binding key是”*.orange.*" Q2的binding key是“*.*.rabbit”和 "lazy.#".
以文字表述便是:
Q1 关心所有橘色的动物
Q2 关心所有的rabbit和所有的lazy动物
routing key为“quick.orange.rabbit"的message会同时发布到这两个queue。
routing key为"lazy.orange.elephant"的message会同时发布到这两个queue。
routing key为”quick.orange.fox“只会发布到第一个queue.
routing key为”lazy.brown.fox"的message只会发布到第二个queue.
routing key为"lazy.pink.rabbit"的message虽然满足Q2的两个条件,但也只会发布到Q2一次。
routing key为"quick.brown.fox"的message没有任何匹配,就会被丢失。
如果我们发送的message只有一个word或者多余三个word,如"orange"或者"quick.orange.male.rabbit"会发生什么呢?这些message不会匹配任何binding key,均会被丢弃掉。
另外"lazy.orange.male.rabbit"虽然具有四个词,但是会匹配最后的binding key,而被发送到第二个queue。
Topic exhange非常强大,同时可以模仿其他两种类型的exchange。当binding key为 # 时,queue会接收所有的message。当binding key中没有使用通配符(* 和 #)时,topic的行为和direct一致。开始执行
我们将在日志系统中使用topic exchange。我们的routding key采用两个词 "
EmitLogTopic.java的代码如下:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent "" + routingKey + "":"" + message + """); } } //.. }
ReceiveLogsTopic.java的代码如下:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received "" + delivery.getEnvelope().getRoutingKey() + "":"" + message + """); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
编译
javac -cp $CP ReceiveLogsTopic.java EmitLogTopic.java
接收所有日志
java -cp $CP ReceiveLogsTopic "#"
接收功能"kern"的日志
java -cp $CP ReceiveLogsTopic "kern.*"
接收严重级别日志
java -cp $CP ReceiveLogsTopic "*.critical"
接收者使用两个绑定条件
java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"
发送日志message
java -cp $CP EmitLogTopic "kern.critical" "A critical kernal error"
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/73916.html
摘要:路由模式在之前的文章中我们建立了一个简单的日志系统。更形象的表示,如对中的感兴趣。为了进行说明,像下图这么来设置如图,可以看到有两个绑到了类型为的上。如图的设置中,一个为的就会同时发送到和。接收程序可以选择要接收日志的严重性级别。 路由模式 在之前的文章中我们建立了一个简单的日志系统。我们可以通过这个系统将日志message广播给很多接收者。 在本篇文章中,我们在这之上,添加一个新的功...
摘要:推广专题讲座开源项目我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。主题交换机也可以当成其它交换机来使用,假如队列绑定到了那么它会接收所有的消息,就像广播路由器一样而如果未使用,那么就跟直达路由器一样了。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性...
摘要:每个消费者会得到平均数量的。为了确保不会丢失,采用确认机制。如果中断退出了关闭了,关闭了,或是连接丢失了而没有发送,会认为该消息没有完整的执行,会将该消息重新入队。该消息会被发送给其他的。当消费者中断退出,会重新分派。 Work模式 原文地址showImg(https://segmentfault.com/img/bVbqlXr?w=694&h=252); 在第一章中,我们写了通过一个...
摘要:发布订阅模式在之前的文章里,创建了。我们称之为发布订阅模式。其实我们是用到了默认的,用空字符串来标识。空字符串代表了没有名字的被路由到了由指定名字的。和这种关系的建立我们称之为从现在开始这个就会将推向我们的队列了。 发布订阅模式 在之前的文章里,创建了work queue。work queue中,每一个task都会派发给一个worker。在本章中,我们会完成完全不一样的事情 - 我们会...
摘要:如果涉及返回值,就要用到本章提到的了。方法发送请求,并阻塞知道结果返回。当有消息时,进行计算并通过指定的发送给客户端。当接收到,则检查。如果和之前的匹配,则将消息返回给应用进行处理。 RPC模式 在第二章中我们学习了如何使用Work模式在多个worker之间派发时间敏感的任务。这种情况是不涉及到返回值的,worker执行任务就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...
阅读 646·2023-04-25 22:50
阅读 1472·2021-10-08 10:05
阅读 944·2021-09-30 09:47
阅读 1817·2021-09-28 09:35
阅读 771·2021-09-26 09:55
阅读 3282·2021-09-10 10:51
阅读 3388·2021-09-02 15:15
阅读 3253·2021-08-05 09:57