资讯专栏INFORMATION COLUMN

RabbitMQ快速入门

Moxmi / 443人阅读

摘要:就是交换机生产者发送消息给交换机,然后由交换机将消息转发给队列。对应于中则是发送一个,处理完成之后将其返回给。这样来说一个是级别而不是级别的了。当然这些也都是官网的入门例子,后续有机会的话再深入研究。

一、前言

RabbitMQ其实是我最早接触的一个MQ框架,我记得当时是在大学的时候跑到图书馆一个人去看,由于RabbitMQ官网的英文还不算太难,因此也是参考官网学习的,一共有6章,当时是用Node来开发的,当时花了一下午看完了,也理解了。而现在回过头来再看,发现已经忘记了个差不多了,现在再回过头来继续看看,然乎记之。以防再忘,读者看时最好有一定的MQ基础。

二、RabbitMQ

首先我们需要知道的是RabbitMQ它是基于高级队列协议(AMQP)的,它是Elang编写的,下面将围绕RabbitMQ队列、交换机、RPC三个重点进行展开。

2.1、队列

存储消息的地方,多个生产者可以将消息发送到一个队列,多个消费者也可以消费同一个队列的消息。

注意:当多个消费者监听一个队列,此时生产者发送消息到队列只有一个消费者被消费,并且消费端的消费方式是按照消费端在内部启动的顺序轮询(round-robin)。
2.2、消费者

消费消息的一方

public class Send {

    private final static String QUEUE_NAME = "hello";
    private final static String IP = "172.16.12.162";
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP);
        factory.setUsername("admin");
        factory.setPassword("admin");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent "" + message + """);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
public class Recv {

    private final static String QUEUE_NAME = "hello";
    private final static String IP = "172.16.12.162";

    public static void main(String[] args) {
        try {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(IP);
            factory.setUsername("admin");
            factory.setPassword("admin");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received "" + message + """);
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
2.3、小结

1、Rabbit是如何保证消息被消费的?
答:通过ack机制。每当一个消息被消费端消费的时候,消费端可以发送一个ack给RabbitMQ,这样RabbitMQ就知道了该条消息已经被完整消费并且可以被delete了。;如果一条消息被消费但是没有发送ack,那么此时RabbitMQ将会认为需要重新消费该消息,如果此时还有其它的消费者,那么此时RabbitMQ将会把这条消息交给它处理。

注意:开启ack机制的是autoAck=false;

2、消息如何进行持久化?

将queue持久化,即设置 channel.queueDeclare(QUEUE_NAME, true, false, false, null);第二个参数durable为true

设置消息持久化,即设置MessageProperties.PERSISTENT_TEXT_PLAIN

注意:消息持久化并不一定保证消息不会被丢失

3、RabbitMQ如何避免两个消费者一个非常忙一个非常闲的情况?
通过如下设置,保证一个消费者一次只能消费一个消息,只有当它消费完成并且返回ack给RabbitMQ之后才给它派发新的消息。

int prefetchCount = 1 ;
channel.basicQos(prefetchCount)

4、RabbitMQ异常情况下如何保证消息不会被重复消费?
需要业务自身实现密等性,RabbitMQ没有提供比较好的方式去保证。

2.2、交换机

在RabbitMQ中,生产者其实从来不会发送消息到队列,甚至,它不知道消息被发送到了哪个队列。那它被发送到了哪里呢?就是本节的重点:交换机,下面就是它在RabbitMQ中的介绍图。(X就是交换机)生产者发送消息给交换机,然后由交换机将消息转发给队列。

从上图就产生一个问题:X怎么将消息发给queue呢?它是把消息发给所有queue还是发给一个指定的queue或者丢弃消息呢?这就是看交换机的类型了。下面一起谈谈这几种类型

2.2.1、fanout

fanout:广播模式,这个比较好理解,就是所有的队列都能收到交换机的消息。

如上面,两个队列都能收到交换机的消息。

2.2.2、direct

这个模式相当于发布/订阅模式的一种,当交换机类型为direct的时候,此时我们需要设置两个参数:

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));第二个参数,我们可以把它称呼为routeKey

channel.queueBind(queueName, EXCHANGE_NAME, "");第三个参数,我们把它称呼为bindKey

有了这两个参数,我们就可以指定我们订阅哪些消息了。


如图,Q1订阅了orange的消息,Q2订阅了black、green的消息。

2.2.3、topic

其实topic和direct有一点类似,它相当于对direct作了增强。在direct中,我们上面所说的bind routeKey为black、green的它是有限制的,它只能绝对的等于routeKey,但是有时候我们的需求不是这样,我们可能想要的是正则匹配即可,那么Topic就派上用场了。

当类型为topic时,它的bindKey对应字符串需要是以“.”分割,同时RabbitMQ还提供了两个符号:

星号(*):表示1个单词

井号(#):表示0、多个单词

上图的意思是:所有第二个单词为orange的消息发送个Q1,所有最后一个单词为rabbit或者第一个单词为lazy的消息发送给Q2。

2.2.4、header

这一种类型官方demo没有过多解释,这里也不研究了。

2.3、RPC

RabbitMQ 还可以实现RPC(远程过程调用)。什么是RPC,简单来说就是local调用remote方法。对应于RabbitMQ中则是Client发送一个request message,Server处理完成之后将其返回给Client。这里就有了一个疑问?Server是如何将response返回给Client的,这里RabbitMQ定义了一个概念:Callback Queue。
Callback Queue
注意这个队列是独一无二的String replyQueueName = channel.queueDeclare().getQueue();
首先我们需要明白一点的是为什么需要这个queue?我们知道在RabbitMQ作消息队列的时候,Client只需要将消息投放到queue中,然后Server从queue去取就可以了。但是在RabbitMQ作为RPC的时候多了一点就是,Client还需要返回结果,这时Server端怎么知道把消息发送给Client,这就是Callback Queue的用处了。
Correlation Id
在上面我们知道Server返回数据给Client是通过Callback Queue的,那么是为每一个request都创建一个queue吗?这未免太过浪费资源,RabbitMQ有更好的方案。在我们发送request,绑定一个唯一ID(correlationId),然后在消息被处理返回的时候取出这个ID和发出去的ID进行匹配。这样来说一个Callback Queue是Client级别而不是request级别的了。

实现
上面介绍了RabbitMQ实现RPC最重要的两个概念,具体代码比较简单还是贴下把。
client 端

public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] argv) throws Exception{
        RPCClient fibonacciRpc = new RPCClient();
        try {
            for (int i = 0; i < 32; i++) {
                String i_str = Integer.toString(i);
                System.out.println(" [x] Requesting fib(" + i_str + ")");
                String response = fibonacciRpc.call(i_str);
                System.out.println(" [.] Got "" + response + """);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue response = new ArrayBlockingQueue<>(1);

        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });

        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    public void close() throws IOException {
        connection.close();
    }
}

服务端

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.queuePurge(RPC_QUEUE_NAME);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();

                String response = "";

                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    // RabbitMq consumer worker thread notifies the RPC server owner thread
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
三、总结

这次回头再看RabbitMQ,再次重新理解了以下RabbitMQ,有些东西还是要慢慢嚼的。当然这些也都是官网的入门例子,后续有机会的话再深入研究。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74445.html

相关文章

  • 快速入门spring-amqp

    摘要:它还为具有侦听器容器的消息驱动的提供支持。接收消息当存在基础结构时,可以使用任何来注释以创建侦听器端点。默认情况下,如果禁用重试并且侦听器抛出异常,则会无限期地重试传递。 Spring-amqp-tutorial Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。它提供了一个模板作为发送和接收消息的高级抽象。它还为具有侦听器容器的消息驱动的PO...

    邹强 评论0 收藏0
  • Spring Cloud构建微服务架构:消息驱动的微服务(入门)【Dalston版】

    摘要:它通过使用来连接消息代理中间件以实现消息事件驱动的微服务应用。该示例主要目标是构建一个基于的微服务应用,这个微服务应用将通过使用消息中间件来接收消息并将消息打印到日志中。下面我们通过编写生产消息的单元测试用例来完善我们的入门内容。 之前在写Spring Boot基础教程的时候写过一篇《Spring Boot中使用RabbitMQ》。在该文中,我们通过简单的配置和注解就能实现向Rabbi...

    smallStone 评论0 收藏0
  • RabbitMQ 快速入门 python

    摘要:为了预防消息丢失,提供了,即工作进程在收到消息并处理后,发送给,告知这时候可以把该消息从队列中删除了。如果工作进程挂掉了,没有收到,那么会把该消息重新分发给其他工作进程。之前在发布消息时,的值为即使用。 HelloWorld 简介 RabbitMQ:接受消息再传递消息,可以视为一个邮局。发送者和接受者通过队列来进行交互,队列的大小可以视为无限的,多个发送者可以发生给一个队列,多个接收者...

    wenshi11019 评论0 收藏0

发表评论

0条评论

Moxmi

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<