资讯专栏INFORMATION COLUMN

rabbitmq

Hwg / 2822人阅读

摘要:一关键字和之间的连接关系实际存储消息。生产者进行接受应答,用来确定这条消息是否正常的发送到了,这种方式也是消息的可靠性投递的核心保障。支持消息的过期时间,在消息发送时可以进行指定。可以监听这个队列中消息做相应的处理。

一、rabbitmq关键字
Binding:Exchange和Exchange、Queue之间的连接关系
Queue:实际存储消息。
Durability:是否持久化,Durable:是,Transient:否。
Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动删除。
Message:服务器和应用程序之间传送的数据。本质上就是一段数据,由Properties和Payload(Body)组成。常用属性:delivery mode、headers(自定义属性)
Virtual host:虚拟主机,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或者Queue。
二、rabbitmq高级特性

消息如何保证100%的投递成功?

生产端可靠性投递

保障消息的成功发出

保障MQ节点的成功接收

发送端收到MQ节点(Broker)确认应答

完善的消息进行补偿机制

可靠性投递解决方案

消息落库,对消息状态进行打标

消息的延迟投递,做二次确认,回调检查

消息集群镜像队列:rabbitmq集群模式非常经典的就是Mirror镜像模式,保证100%数据不丢失,在实际工作中也是用的最多的。并且实现集群非常简单,一般互联网公司都会构建这种镜像集群模式。https://segmentfault.com/a/11...

幂等性概念详解

在海量订单产生的业务高峰期,如何避免消息的重复消费问题?

Confirm确认消息、Return返回消息

理解Confirm消息确认机制:

消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。

生产者进行接受应答,用来确定这条消息是否正常的发送到了Broker,这种方式也是消息的可靠性投递的核心保障。

如何实现Confirm确认消息?

第一步:在channel上开启确认模式:channel.confirmSelect()

第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或者记录日志等后续处理。

Return消息机制

Return Listener用于处理一些不可路由的消息

我们的消息生产者,通过制定一个Exchange和Routing Key,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理。

但是在某写情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener。

Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息。

自定义消费者

    public class MyConsumer extends DefaultConsumer implements Consumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }


    @Override
    public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException {
        
    }

}

消息的ACK与重回队列

消费端手工ACK(应答成功)和NACK(应答失败)

消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿。

由于服务器宕机等严重问题,我们就要手工进行ACK保障消费端消费成功。比如:消费一半的时候宕机了,broker端没有收到应答,重发消息。

消费端重回队列

消费端重回队列是为了对没有处理成功的消息,把消息重新回递给broker。

一般我们在实际应用中,都会关闭重回队列,也就是设置为false。

channel.basciNack(envelope.getDeliveryTag(),false,true) // 为true的话,在消费失败的情况下会重回队列放入队列末端。

消息的限流

假设有一个场景,RabbitMq服务器上有上万条未处理的消息,我们随便打开一个消费者客户端,会出现下面的情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据。RabbitMq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
void BasicQos(unit prefetchSize, ushort prefetchCount, bool global);
prefetchSize:0
prefetchCount: 会告诉RabbitMq不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有被ack,则该consumer将block掉,知道有消息ack。
global: true/false,是否将上面设置应用于channel,简答点说,就是上面限制是channel级别的还是consumer级别。
// 限流方式,第一件事就是autoAck设置为false,关闭自动签收,必须手动签收
channel.basicQos(0,3,false); // 3表示如果消息积压了1000条,先给我推3条,这三条消费结束后,我会给你一个ack表示这三条我已经处理完了,然后再给我推送3条...
channel.basicConsume(queueName,false,new MyConsumer(channel))
//在MyConsumer中对消息进行签收ack
channel.basicAck(envelope,getDeliveryTag(), false);

TTL消息

TTL:是Time To Live。就是生存时间。

RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。

RabbitMQ支持队列的过期时间,从消息入队开始计算,只要超过了队列的超时时间配置,那么消息会自动清除。

死信队列

死信队列:DLX,Dead-Letter-Exchange。
利用DLX,当消息在一个队列中变成了死信(dead message:这条消息没有消费者去消费)之后,他能被重新publish到另外一个Exchange,这个Exchange就是DLX。
进入死信队列(进入死信的三种方式)

1.消息被拒绝(basic.reject or basic.nack)并且requeue=false

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

2.消息TTL过期

3.队列达到最大长度

备注说明

DLX也是一个正常的Exchange,和一般的Exchange没区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时,RabbitMQ会自动将这个消息重新发送到已经设置了的Exchange上去,进而被路由到另外一个队列上去。

可以监听这个队列中消息做相应的处理。

死信队列设置

首先需要设置死信队列的exchange和queue,然后进行绑定:Exchange:dlx.exchange、Queue:dlx.queue、RoutingKey:#

然后进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可:arguments.put("x-dead-letter-exchange","dlx.exchange");

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77668.html

相关文章

  • RabbitMQ】——centos7安装rabbitmq教程 以及 PHP开启rabbitmq扩展

    摘要:第一步安装因为是语言编写的,所以我们首先需要安装第二步安装官网提供的安装方式本人安装成功的方式第三步查看是否已经安装好了,能查到说明已经安装完成了。 第一步:安装Erlang 因为rabbitMQ是Erlang语言编写的,所以我们首先需要安装Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...

    lscho 评论0 收藏0
  • RabbitMQ】——centos7安装rabbitmq教程 以及 PHP开启rabbitmq扩展

    摘要:第一步安装因为是语言编写的,所以我们首先需要安装第二步安装官网提供的安装方式本人安装成功的方式第三步查看是否已经安装好了,能查到说明已经安装完成了。 第一步:安装Erlang 因为rabbitMQ是Erlang语言编写的,所以我们首先需要安装Erlang rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el...

    printempw 评论0 收藏0
  • 基于RabbitMQ的MQTT插件搭建MQTT服务,使用MQTTX进行收发测试

    摘要:本文基于的插件,针对进行简单的测试。包括协议的介绍,的安装配置开启插件及基于进行的测试。协议是基于发布订阅模型的物联网消息传递协议。对传输消息有三种服务质量最多一次,这一级别会发生消息丢失或重复,消息发布依赖于底层网络。 ...

    ymyang 评论0 收藏0

发表评论

0条评论

Hwg

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<