资讯专栏INFORMATION COLUMN

白话RabbitMQ(六): RPC

KevinYan / 2915人阅读

摘要:因为消费消息是在另外一个进程中,我们需要阻塞我们的进程直到结果返回,使用阻塞队列是一种非常好的方式,这里我们使用了长度为的,的功能是检查消息的的是不是我们之前所发送的,如果是,将返回值返回到。

推广
RabbitMQ专题讲座

https://segmentfault.com/l/15...

CoolMQ开源项目

我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https://github.com/vvsuperman…,项目支持网站: http://rabbitmq.org.cn,最新文章或实现会更新在上面

声明RPC接口

为了阐述RPC我们先建立一个客户端接口,它有一个方法,会发起一个RPC请求,而且会一直阻塞直到有结果返回

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

留意RPC
虽然RPC很常见,但一定要非常小心的使用它,假设rpc调用的是一个非常慢的程序,将导致结果不可预料,而且非常难以调试。

使用RPC时你可以参考下列一些规范

系统设计上要有详细的文档描述,使组件间的依赖讲清晰,做到有据可查

做好错误的异常处理,特别是当RPC服务挂掉或很长时间没有响应时

尽量少用RPC,而使用异步管道,而非阻塞式的RPC,降低系统间的耦合

回调队列(Callback queue)

用RabbitMQ实现RPC比较简单,客户端发起请求,服务端返回对这个请求的响应。为了实现这个功能我们需要一个能够"回调"的队列,我们直接用默认的队列即可

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the     callback_queue ...
消息属性(Message properties)

AMQP 0-9-1 协议为每个消息定义了14个属性,很多属性很少会被用到,但我们要特别留意如下几个

分发模式(deliveryMode): 标记一个消息是否需要持久化(persistent)或者是需要事务(transient)等,在第二章中有描述

消息体类型(contentType): 描述消息中传递具体内容的编码方式,比如我们经常使用的JSON可以设置成:application/json

消息回应(replyTo):用于回调队列

关系Id(correlationId): 用于将RPC的返回值关联到对应的请求。

我们需要引入相应的包

import com.rabbitmq.client.AMQP.BasicProperties;
关系Id(Correlation Id)

在前面的方法中我们为每一个RPC请求都生成了一个队列,这是完全没有必要的,我们为每一个客户端建立一个队列就可以了。

这会引起一个新的问题,因为所有的RPC都是用一个队列,一旦有消息返回,你怎么知道返回的消息对应的是哪个请求呢?所以我们就用到了Correlation Id,作为每个请求独一无二的标识,当我们收到返回值后,会检查这个Id,匹配对应的响应。如果找不到Id所对应的请求,会直接抛弃它。

这里你可能会有疑问,为什么要抛弃掉未知消息呢?而不是抛出异常啥的。这跟我们服务端的竞态条件(possibility of a race condition )会有关系。比如假设我们RabbitMQ服务挂掉了,它刚给我们回复消息,还没等到回应,服务器就挂掉了,那么当RabbitMQ服务重启时,会重发消息,客户端会收到一条重复的消息,为了冥等性的考虑,我们需要仔细的处理返回后的处理方式。

小结

RPC工作过程如下

当客户端启动时,它会创建一个独立的匿名回调队列,然后发送RPC请求,这个RPC
请求会带两个属性:replyTo - RPC调用成功后需要返回的队列名称;correlationId - 每个请求独一无二的标识。RPC服务提供者会等在队列上,一旦有请求到达,它会立即响应,把自己的活干完,然后返回一个结果,根据replyTo返回到对应的队列。而客户端也会等着队列中的信息返回,一旦有一个消息出现,会检查correlationId,将结果返回给响应的请求发起者

整合

Fibonacci级数

private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

我们定义个一个fibonacci级数,只能接受正整数,而且是效率不怎么高的那种。
rpc.java如下所示

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    public static void main(String[] argv) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = null;
        try {
            connection      = factory.newConnection();
            final Channel channel = connection.createChannel();

            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                            .Builder()
                            .correlationId(properties.getCorrelationId())
                            .build();

                    String response = "";

                    try {
                        String message = new String(body,"UTF-8");
                        int n = Integer.parseInt(message);

                        System.out.println(" [.] fib(" + message + ")");
                        response += fib(n);
                    }
                    catch (RuntimeException e){
                        System.out.println(" [.] " + e.toString());
                    }
                    finally {
                        channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));

                        channel.basicAck(envelope.getDeliveryTag(), false);

            // RabbitMq consumer worker thread notifies the RPC server owner thread 
                    synchronized(this) {
                        this.notify();
                    }
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

            // Wait and be prepared to consume the message from RPC client.
        while (true) {
            synchronized(consumer) {
        try {
              consumer.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();          
            }
            }
         }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (connection != null)
        try {
                connection.close();
             } catch (IOException _ignore) {}
         }
    }
}

服务端的代码比较直接,首先建立连接,建立channel以及声明队列。我们之后可能会建立多个消费者,为了更好的负载均衡,需要在channel.basicQos中设置prefetchCount,然后设置一个basicConsume监听队列,提供一个回调函数来处理请求以及返回值

RPCClient.java

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient {

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
    }

    public String call(String message) throws IOException, InterruptedException {
        String corrId = UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue response = new ArrayBlockingQueue(1);

        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                if (properties.getCorrelationId().equals(corrId)) {
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });

        return response.take();
    }

    public void close() throws IOException {
        connection.close();
    }

    //...
}

客户端代码如下,我们建立一个连接,声明一个"callback"队列,我们将会往"callback"队列提交消息,并接收RPC的返回值,具体步骤如下:

我们首先生成一个唯一的correlation Id,并保存,我们将会使用它来区分之后所接受到的信息。然后发出这个消息,消息会包含两个属性: replyTo以及collelationId。因为消费消息是在另外一个进程中,我们需要阻塞我们的进程直到结果返回,使用阻塞队列BlockingQueue是一种非常好的方式,这里我们使用了长度为1的ArrayBlockQueue,handleDelivery的功能是检查消息的的correlationId是不是我们之前所发送的,如果是,将返回值返回到BlockingQueue。此时主线程会等待返回并从ArrayBlockQueue取到返回值

从客户端发起请求

RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got "" + response + """);

fibonacciRpc.close();

源代码参考RPCClient.java 和 RPCServer.java
编译

javac -cp $CP RPCClient.java RPCServer.java

我们的rpc服务端好了,启动服务

java -cp $CP RPCServer
# => [x] Awaiting RPC requests

为了获取fibonacci级数我们只需要运行客户端:

java -cp $CP RPCClient
# => [x] Requesting fib(30)

以上的实现方式并非建立RPC请求唯一的方式,但是它有很多优点:如果一个RPC服务过于缓慢,你可以非常方便的水平扩展,只需要增加消费者的个数即可,我们的代码还是比较简单的,有些负责的问题并未解决,比如

如果服务全部挂了,客户端要如何处理

如果服务超时该如何处理

非法信息该如何处理

基础章节的内容到此就结束了,到这里,你就能够基本明白消息队列的基本用法,接下来我们可以进入中级内容内容的学习了。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68133.html

相关文章

  • 【译】RabbitMQ系列()-RPC模式

    摘要:如果涉及返回值,就要用到本章提到的了。方法发送请求,并阻塞知道结果返回。当有消息时,进行计算并通过指定的发送给客户端。当接收到,则检查。如果和之前的匹配,则将消息返回给应用进行处理。 RPC模式 在第二章中我们学习了如何使用Work模式在多个worker之间派发时间敏感的任务。这种情况是不涉及到返回值的,worker执行任务就好。如果涉及返回值,就要用到本章提到的RPC(Remote ...

    894974231 评论0 收藏0
  • RabbitMQ+PHP 教程RPC

    摘要:有助于将响应与请求关联起来。如果发生这种情况,重新启动的服务器将再次处理请求。又名服务器正在等待该队列上的请求。当消息出现时,它检查属性。然后,我们进入循环,在其中等待请求消息,完成工作并发送响应。 (using php-amqplib) 前提必读 本教程假设RabbitMQ是安装在标准端口上运行(5672)。如果您使用不同的主机、端口或凭据,则连接设置需要调整。 如果您在本教程中遇到...

    anquan 评论0 收藏0
  • 白话rabbitmq(一): HelloWorld

    摘要:作为消息队列的一个典型实践,完全实现了标准,与的快快快不同,它追求的稳定可靠。同一个队列不仅可以绑定多个生产者,而且能够发送消息到多个消费者。消费者接受并消费消息。几乎于完全类似是一个继承了接口的类,方便我们来存储消息队列来的消息。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的...

    garfileo 评论0 收藏0
  • 白话RabbitMQ(三):发布/订阅

    摘要:推广专题讲座开源项目我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。因此一旦有有消息,消息会广播到所有的消费者。如此一来路由器就能够把消息发送给相应的队列了。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https...

    Ververica 评论0 收藏0
  • 白话RabbitMQ(四): 建立路由

    摘要:可以参考源码,项目支持网站,最新文章或实现会更新在上面前言在订阅发布中我们建立了一个简单的日志系统,从而将消息广播给一些消费者。因此,发送到路由键的消息会发送给队列,发送到路由键或者的消息会发送给,其它的消息将被丢弃。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性解决...

    CoderStudy 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<