资讯专栏INFORMATION COLUMN

rabbitmq中文教程python版 - Topics

ernest.wang / 3431人阅读

摘要:单词可以是任何东西,但通常它们指定了与该消息相关的一些功能。消息将使用由三个字两个点组成的路由键发送。另一方面,只会进入第一个队列,而只会进入第二个队列。不匹配任何绑定,因此将被丢弃。代码几乎与前一个教程中的代码相同。

源码:https://github.com/ltoddy/rabbitmq-tutorial

Topics

(using the Pika Python client)

本章节教程重点介绍的内容

在之前的教程中,我们改进了日志记录系统。我们没有使用只有虚拟广播的fanout交换,而是使用了direct交换,并让选择性接收日志成为了可能。

尽管使用direct交换改进了我们的系统,但它仍然有局限性 - 它不能根据多个标准进行路由。

在我们的日志系统中,我们可能不仅需要根据严重性来订阅日志,还要根据发布日志的来源进行订阅。您可能从syslog unix工具知道这个概念,
该工具根据严重性(info / warning / crit...)和工具(auth / cron / kern ...)来路由日志。

这会给我们很大的灵活性 - 因为我们可能想听取来自"cron"的error日志,而且还听取来自"kern"的所有日志。

为了在我们的日志系统中实现这一点,我们需要了解更复杂的topic交换。

Topic 交换

发送到topic交换的消息必须有规范的routing_key - 它必须是由点分隔的单词列表。单词可以是任何东西,但通常它们指定了与该消息相关的一些功能。
一些有效的routing_key例子: "stock.usd.nyse","nyse.vmw","quick.orange.rabbit"。只要您愿意,路由键中可以有任意的单词,但最多255个字节。

绑定键也必须是相同的形式。topic交换背后的逻辑与direct topic交换背后的逻辑类似 - 使用特定路由键发送的消息将被传递到与匹配绑定键绑定的所有队列。
但是绑定键有两个重要的特殊情况:

* (star) 可以代替一个字。

# (hash) 可以替代零个或多个单词。

在这个例子中解释这个很简单:

在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个字(两个点)组成的路由键发送。
路由关键字中的第一个单词将描述速度,第二个颜色和第三个物种:" "。

我们创建了三个绑定:Q1绑定了绑定键" *.orange.* ",Q2绑定了" *.*.rabbit "和" lazy.#"。

这些绑定可以概括为:

Q1对所有的橙色动物都感兴趣。

Q2希望听到关于兔子的一切,以及关于懒惰动物的一切。

将路由键设置为"quick.orange.rabbit"的消息将传递到两个队列。消息"lazy.orange.elephant"也会去他们两个。
另一方面,"quick.orange.fox"只会进入第一个队列,而"lazy.brown.fox"只会进入第二个队列。
"lazy.pink.rabbit"只会传递到第二个队列一次,即使它匹配了两个绑定。
"quick.brown.fox"不匹配任何绑定,因此将被丢弃。

如果我们违反我们的合同并发送带有一个或四个单词的消息,如"orange"或"quick.orange.male.rabbit",
会发生什么情况?那么,这些消息将不匹配任何绑定,并会丢失。

另一方面,"lazy.orange.male.rabbit"即使有四个单词,也会匹配最后一个绑定,并将传递到第二个队列。

direct change

  话题交换功能强大,可以像其他交流一样行事。
  当使用" # "(散列)绑定键绑定队列时,它将接收所有消息,
  而不管路由密钥如何 - 就像在*fanout*交换中一样。
  当在绑定中没有使用特殊字符"*"(星号)和"#"(散列)时,主题交换将像*direct*交换一样。
把它放在一起

我们将在我们的日志系统中使用topic交换。我们首先假定日志的路由键有两个单词:" . "。

代码几乎与前一个教程中的代码相同 。

emit_log_topic.py的代码:

#!/usr/bin/env python
import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

channel.exchange_declare(exchange="topic_logs",
                         exchange_type="topic")

routing_key = sys.argv[1:] if len(sys.argv) > 2 else "anonymous.info"
message = " ".join(sys.argv[2:]) or "Hello World"
channel.basic_publish(exchange="topic_logs",
                      routing_key=routing_key,
                      body=message)

print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

receive_logs_topic.py的代码:

#!/usr/bin/env python
import sys
import pika

# connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
connection = pika.BlockingConnection(pika.ConnectionParameters("172.17.0.2"))
channel = connection.channel()

channel.exchange_declare(exchange="topic_logs",
                         exchange_type="topic")

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...
" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange="topic_logs",
                       queue=queue_name,
                       routing_key=binding_key)

print(" [*] Waiting for logs. To exit press CTRL+C")


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

要接收所有日志运行:

python receive_logs_topic.py "#"

要从设施“ kern ” 接收所有日志:

python receive_logs_topic.py "kern.*"

或者,如果您只想听到关于“ critical ”日志的信息:

python receive_logs_topic.py "*.critical"

您可以创建多个绑定:

python receive_logs_topic.py "kern." ".critical"

发布带有路由键“ kern.critical ”类型的日志:

python emit_log_topic.py "kern.critical" "A critical kernel error"

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/41439.html

相关文章

  • rabbitmq中文教程python - 介绍

    摘要:每当我们收到一条消息,这个回调函数就被皮卡库调用。接下来,我们需要告诉这个特定的回调函数应该从我们的队列接收消息为了让这个命令成功,我们必须确保我们想要订阅的队列存在。生产者计划将在每次运行后停止欢呼我们能够通过发送我们的第一条消息。 源码:https://github.com/ltoddy/rabbitmq-tutorial 介绍 RabbitMQ是一个消息代理:它接受和转发消息。你...

    yimo 评论0 收藏0
  • rabbitmq中文教程python - 工作队列

    摘要:我们将任务封装为消息并将其发送到队列。为了确保消息永不丢失,支持消息确认。没有任何消息超时当消费者死亡时,将重新传递消息。发生这种情况是因为只在消息进入队列时调度消息。这告诉一次不要向工作人员发送多个消息。 源码:https://github.com/ltoddy/rabbitmq-tutorial 工作队列 showImg(https://segmentfault.com/img/r...

    tabalt 评论0 收藏0
  • rabbitmq中文教程python - 发布 / 订阅

    摘要:交易所在本教程的前几部分中,我们发送消息并从队列中接收消息。消费者是接收消息的用户的应用程序。中的消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列中。交换和队列之间的关系称为绑定。 源码:https://github.com/ltoddy/rabbitmq-tutorial 发布 / 订阅 (using the Pika Python client) 本章节教程重点介绍的...

    alphahans 评论0 收藏0
  • rabbitmq中文教程python - 路由

    摘要:为了避免与参数混淆,我们将其称为绑定键。直接交换我们之前教程的日志记录系统将所有消息广播给所有消费者。在这种设置中,使用路由键发布到交换机的消息将被路由到队列。所有其他消息将被丢弃。 源码:https://github.com/ltoddy/rabbitmq-tutorial 路由 本章节教程重点介绍的内容 在之前的教程中,我们构建了一个简单的日志系统 我们能够将日志消息广播给许多接收...

    Hwg 评论0 收藏0
  • rabbitmq中文教程python - 远程过程调用

    摘要:通常用于命名回调队列。对每个响应执行的回调函数做了一个非常简单的工作,对于每个响应消息它检查是否是我们正在寻找的。在这个方法中,首先我们生成一个唯一的数并保存回调函数将使用这个值来捕获适当的响应。 源码:https://github.com/ltoddy/rabbitmq-tutorial 远程过程调用(RPC) (using the Pika Python client) 本章节教程...

    chuyao 评论0 收藏0

发表评论

0条评论

ernest.wang

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<