资讯专栏INFORMATION COLUMN

白话RabbitMQ(三):发布/订阅

Ververica / 3402人阅读

摘要:推广专题讲座开源项目我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。因此一旦有有消息,消息会广播到所有的消费者。如此一来路由器就能够把消息发送给相应的队列了。

推广
RabbitMQ专题讲座

https://segmentfault.com/l/15...

CoolMQ开源项目

我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https://github.com/vvsuperman…,项目支持网站: http://rabbitmq.org.cn,最新文章或实现会更新在上面

前言

在第二章中我们描述了任务队列,在任务队列中一个消息只会发送给一个消费者。而在这一章中我们将消息发送给许多个消费者,我们称之为“发布/订阅”

为了更好的阐述这个模式,我们会建立一个新的简单的logging系统,包含2个步骤-第一步发送log信息,第二步能够接受并将信息打印出来,而且在第二步中所有的消费者都会接受到同样的消息,比如一个消费者用来将log信息写到磁盘,另外一个接受信息并显示在屏幕上。因此一旦有有消息,消息会广播到所有的消费者。

交换机(Exchanges)

前面的章节中我们是直接通过queue来处理消息,现在我们来介绍一种更完善的模式

让我们迅速浏览一遍前面的主题:

生产者是一个客户端程序,用来发送消息

队列是一个缓冲,用来存储消息

消费者是一个客户端程序,用来接受消息

RabbitMQ的核心思想是生产者不会将消息直接发送给队列,意味着生产者是完全看不到队列的。反之,生产者只能将消息发送给路由器(Exchange),再由路由器来决定该如何来处理消息,是将消息发送给一个队列呢,还是发送给许多个队列,或者直接无视,具体的规则是根据路由器的类型而定的。

路由器的类型有这样几种:直连路由器(dirct), 主题路由器(topic),头部路由器(headers),以及多广播路由器(fanout)

channel.exchangeDeclare("logs", "fanout");

广播路由器听起来就很简单,它会将消息广播到所有的它所知道的队列,而这正是我们所需要的。

默认路由器

在前面的章节中虽然没有设置任何路由器,但依然能够将消息发送到队列,这是因为我们的是默认路由器:使用空字符串("")来做的定义:

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数是exchange的名称,在这里是空字符串,消息会通过路由健(routingKey)发送到该键所对应的队列。

然而现在,我们有了确认的路由器

channel.basicPublish( "logs", "", null, message.getBytes());

临时队列
我们之前队列都有名字(Hello队列和task_queue队列),给队列起名字非常重要-需要将消费者绑定到特定的queue上面,以及需要把消息从生产者发送给特定的消费者。

但对于日志来说,消息会发送到所有的消费者,而并非个别,We"re also interested only in currently flowing messages not in the old ones.为了满足当前需求我们可以做两件事

一旦连接上RabbitMQ,需要一个新的空队列来接受消息,我们可以随机起个名字,甚至根本不起名,而让RabbitMQ来命名它。

一旦消费者断开连接,这个队列就能被删除掉

我们可以这样定义一个不需要持久化、独立的、能够被自动删除的队列

String queueName = channel.queueDeclare().getQueue();

这个名称是RabbitMQ随机分配的,比如amq.gen-JzTY20BRgKO-HjmUJj0wLg.

绑定

我们已经声明了一个广播路由器,现在需要告诉这个路由器需要把信息发送给哪些队列,路由器和队列间的这个关系就称之为绑定

channel.queueBind(queueName, "logs", "");

如此一来路由器就能够把消息发送给相应的队列了。

整合

发送者与我们之前的代码基本相同,最重大的区别我们现在是发送给带名称的路由器了,同时我们也需要一个路由键,但这里也不需要,因为广播路由器会忽略这个值,这是我们EmitLog.java的代码

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent "" + message + """);

        channel.close();
        connection.close();
    }
    //...
}

可以看到,一旦我们建立的连接立即定义了一个路由器,这个步骤对我们非常重要,因为是严禁将消息发送给并不存在的路由的。

同时,如果路由器没有绑定队列,消息也会丢失掉,但这对于我们来说是ok的:如果并没有消费者在监听,我们可以直接丢弃掉这个消息。

ReciveLogs.java代码如下:

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received "" + message + """);
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

编译代码

javac -cp $CP EmitLog.java ReceiveLogs.java

如果你希望将log存储到本机上

java -cp $CP ReceiveLogs > logs_from_rabbit.log

如果你希望在屏幕上显示log信息,打开一个新的终端:

java -cp $CP ReceiveLogs

发送消息

java -cp $CP EmitLog

如此一来,就能够存储消息的同时进行打印了。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68123.html

相关文章

  • 白话RabbitMQ(四): 建立路由

    摘要:可以参考源码,项目支持网站,最新文章或实现会更新在上面前言在订阅发布中我们建立了一个简单的日志系统,从而将消息广播给一些消费者。因此,发送到路由键的消息会发送给队列,发送到路由键或者的消息会发送给,其它的消息将被丢弃。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性解决...

    CoderStudy 评论0 收藏0
  • 【译】RabbitMQ系列() - 发布/订阅模式

    摘要:发布订阅模式在之前的文章里,创建了。我们称之为发布订阅模式。其实我们是用到了默认的,用空字符串来标识。空字符串代表了没有名字的被路由到了由指定名字的。和这种关系的建立我们称之为从现在开始这个就会将推向我们的队列了。 发布订阅模式 在之前的文章里,创建了work queue。work queue中,每一个task都会派发给一个worker。在本章中,我们会完成完全不一样的事情 - 我们会...

    WrBug 评论0 收藏0
  • RabbitMQ发布订阅实战-实现延时重试队列

    摘要:本文将会讲解如何使用实现延时重试和失败消息队列,实现可靠的消息消费,消费失败后,自动延时将消息重新投递,当达到一定的重试次数后,将消息投递到失败消息队列,等待人工介入处理。 RabbitMQ是一款使用Erlang开发的开源消息队列。本文假设读者对RabbitMQ是什么已经有了基本的了解,如果你还不知道它是什么以及可以用来做什么,建议先从官网的 RabbitMQ Tutorials 入门...

    Heier 评论0 收藏0
  • RabbitMQ发布订阅实战-实现延时重试队列

    摘要:本文将会讲解如何使用实现延时重试和失败消息队列,实现可靠的消息消费,消费失败后,自动延时将消息重新投递,当达到一定的重试次数后,将消息投递到失败消息队列,等待人工介入处理。 RabbitMQ是一款使用Erlang开发的开源消息队列。本文假设读者对RabbitMQ是什么已经有了基本的了解,如果你还不知道它是什么以及可以用来做什么,建议先从官网的 RabbitMQ Tutorials 入门...

    vslam 评论0 收藏0
  • RabbitMq 最全的性能调优笔记

    摘要:性能调优笔记避免雷区要避免流控机制触发服务端默认配置是当内存使用达到,磁盘空闲空间小于,即启动内存报警,磁盘报警报警后服务端触发流控机制。最佳线程生产者使用多线程发送数据到三到五个线程性能发送最佳,超过它也不能提高生产的发送速率。 RabbitMq 性能调优笔记 [TOC] 避免雷区 要避免流控机制触发 服务端默认配置是当内存使用达到40%,磁盘空闲空间小于50M,即启动内存报警,磁...

    Tony 评论0 收藏0

发表评论

0条评论

Ververica

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<