资讯专栏INFORMATION COLUMN

白话rabbitmq(一): HelloWorld

garfileo / 373人阅读

摘要:作为消息队列的一个典型实践,完全实现了标准,与的快快快不同,它追求的稳定可靠。同一个队列不仅可以绑定多个生产者,而且能够发送消息到多个消费者。消费者接受并消费消息。几乎于完全类似是一个继承了接口的类,方便我们来存储消息队列来的消息。

推广
RabbitMQ专题讲座

https://segmentfault.com/l/15...

CoolMQ开源项目

我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https://github.com/vvsuperman…,项目支持网站: http://rabbitmq.org.cn,最新文章或实现会更新在上面

前言

消息队列想必大家都有一定了解:用来解耦,上级模块不用关心下级模块是否执行成功,最常见的比如说日志,核心系统并不关心日志是否成功,日志什么时候记录。这种情形就可以用消息队列来解耦。

RabbitMQ作为消息队列的一个典型实践,完全实现了AMQ标准,与Kafka的快快快不同,它追求的稳定、可靠。下面就来几篇文章来详细介绍下,均翻译至RabbitMQ的官方文档。

RabbitMQ是一个消息的中介(用来接受以及转发消息),就像是一个非常可靠的邮局,当信件放到邮局时,信件就确保能到达,所以,RabbitMQ可以看成是邮箱、邮局、以及邮递员的合体

RabbitMQ的一些重要概念 produceing(生产者):生产数据

queue(队列):

类似于邮箱,存在于RabbitMQ服务器的内部,用来存储消息,并且消息只能存储在队列里面。队列的大小只受RabbitMQ主机内存和硬盘的影响。同一个队列不仅可以绑定多个生产者,而且能够发送消息到多个消费者。

Consuming(消费者):接受并消费消息。

Hello World

下面我们来写我们的第一个“Hello World”,我们会使用Java的API来编写一个生产者来生产消息,以及一个消费者来消费消息

P是我们的生产者,而C是我们的消费者。中间的box是我们的queue:作为消息缓冲,是RabbitMQ用来存储转发消息给消费者的。

Java客户端库

RabbitMQ支持多重协议,这里我们会用AMQP 0-9-1来说明,它是一个消息队列的通用协议。RabbitMQ同时也有多种语言的客户端,我们在这里用Java来做说明。

首先请下载Java客户端包以及它所依赖的SLF4J和SLF4J SIMPLE,将它们拷贝到自己的工作区。

引入RabbitMQ同样也可以使用Maven来做依赖管理, groupId是com.rabbitmq 以及artifactId amqp-client

发送请求

生产者会发送消息到MQ,然后退出
在Send.java中,首先我们import一些类

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

设置我们的主类

public class Send {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException {
      ...
  }
}   

创建Connection

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

这里我们连接的是本地,你当然也可以连接到另一个服务器上,只需要指明服务器的名称和ip地址。

下面我们要创建一个Channel,大家可以想象一些,消息的产生和发送都是通过这个Channel完成的。

当然,我们还需要顶一个一个Queue来接受消息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent "" + message + """);

对于Queue的定义是冥等的,如果不存在才会创建,如果存在则不会再建新的。消息会被格式化成byte的数组,方便进行任意的转换。

最后,我们关闭通道

channel.close();
connection.close();

完整的代码可以看这个地方:send.java

接受请求

消费者会从RabbitMQ接收到请求,消息是被到消费者,而且消费者会一直监听着消息队列,一旦有有新的消息就会打印出来。

Recv.java几乎于Send完全类似

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

Defaultconsumer是一个继承了Consumer接口的类,方便我们来存储消息队列来的消息。建立消费者与我们建立生产者非常类似:

public class Recv {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    }
}

你可以注意到我们在消费者定义了一个Queue,因此我们是需要在生产者之前启动消费者的,我们要确保我们在消费消息之前这个队列是已经存在的。

然后我们需要告诉mq服务可以推送消息给我们。因为这个推送是异步的,因此我们可以提供一个回调方法,DefaultConsumer会暂时存储这个消息,直到消费者以及准备好来处理接受到的消息了(消息会存储在消费者中直到消费者有能力来消费它,可以想象一下数据库等高IO操作)

Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body)
      throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] Received "" + message + """);
  }
};
channel.basicConsume(QUEUE_NAME, true, consumer);

完整的Recv.java地址

跑起来

我们可以先用javac来编译程序

javac -cp amqp-client-4.0.2.jar Send.java Recv.java

而后来运行它,这需要我们在路径加上它的依赖包,我们首先启动的是消费者

java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Recv

而后启动发送者

java -cp .:amqp-client-4.0.2.jar:slf4j-api-1.7.21.jar:slf4j-simple-1.7.22.jar Send

消费者会持续等待,并打印从生产者哪里来的消息,你可以用(Ctrl-C)来停止它。所以你要另外开启一个命令行窗口来运行生产者。

查看队列

也许你想知道RabbitMQ中到底有多少个消息,你可以使用rabbitmqctl工具:

sudo rabbitmqctl list_queues

在Windows中:

rabbitmqctl.bat list_queues




文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70889.html

相关文章

  • 白话RabbitMQ(三):发布/订阅

    摘要:推广专题讲座开源项目我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。因此一旦有有消息,消息会广播到所有的消费者。如此一来路由器就能够把消息发送给相应的队列了。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考源码:https...

    Ververica 评论0 收藏0
  • 白话RabbitMQ(五): 主题路由器(Topic Exchange)

    摘要:推广专题讲座开源项目我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。主题交换机也可以当成其它交换机来使用,假如队列绑定到了那么它会接收所有的消息,就像广播路由器一样而如果未使用,那么就跟直达路由器一样了。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性...

    Gilbertat 评论0 收藏0
  • 白话RabbitMQ(二): 任务队列

    摘要:任务队列最主要的功能就是解耦高耗时的操作,否则程序会一直等在那里,浪费大量资源。我们将任务封装成一个消息发送给队列,后台的任务进程会得到这个任务并执行它,而且可以配置多个任务进程,进一步加大吞吐率。为了确保消息不丢失,支持消息确认。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的...

    fnngj 评论0 收藏0
  • 白话RabbitMQ(四): 建立路由

    摘要:可以参考源码,项目支持网站,最新文章或实现会更新在上面前言在订阅发布中我们建立了一个简单的日志系统,从而将消息广播给一些消费者。因此,发送到路由键的消息会发送给队列,发送到路由键或者的消息会发送给,其它的消息将被丢弃。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性解决...

    CoderStudy 评论0 收藏0
  • 白话RabbitMQ(六): RPC

    摘要:因为消费消息是在另外一个进程中,我们需要阻塞我们的进程直到结果返回,使用阻塞队列是一种非常好的方式,这里我们使用了长度为的,的功能是检查消息的的是不是我们之前所发送的,如果是,将返回值返回到。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考...

    KevinYan 评论0 收藏0

发表评论

0条评论

garfileo

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<