资讯专栏INFORMATION COLUMN

Rabbitmq各参数详解

ckllj / 1825人阅读

摘要:知识库前言舍弃了繁重的事务消息而使用了消息确认机制实现了分布式事务,实在是解耦之一大神器。但是其配置起来挺麻烦,各种参数,各种调整。消费确认消费确认用来确保消费者是否成功的消费了消息。

rabbitmq知识库: http://rabbitmq.org.cn/

前言:

Rabbitmq舍弃了繁重的事务消息而使用了消息确认机制实现了分布式事务,实在是解耦之一大神器。但是其配置起来挺麻烦,各种参数,各种调整。但国内貌似资料很少,找来找去都找不到,自己撸一发先

1 发送确认

发送确认用来确保消息是否已送达消息队列。消息一旦到达消息服务,就会触发确认机制,可以分为两种情况:

对于无法被路由的消息,一旦没法找到一个队列来消费它,就会触发确认无法消费,此时ack=false。旦有一种情况例外,就是连exchange都没法找到,如果设置了mandatory, 此时就会先触发basic.return,就是会先触发returncallback回调。

对于可以被路由的消息,当消息被(所有的?)queue接受时,会触发ack=true;对于设置了持久化(persistent)的消息,当消息成功的持久化到硬盘上才会触发;对于设置了镜像(mirror)的消息,那么是当所有的mirror接受到这个消息。

2 消费确认(Delivery Acknowledgements)

消费确认用来确保消费者是否成功的消费了消息。一旦有消费者成功注册到相应的消息服务,消息将会被消息服务通过basic.deliver推(push)给消费者,此时消息会包含一个deliver tag用来唯一的标识消息。如果此时是手动模式,就需要手动的确认消息已经被成功消费,否则消息服务将会重发消息(因为消息已经持久化到了硬盘上,所以无论消息服务是不是可能挂掉,都会重发消息)。而且必须确认,无论是成功或者失败,否则会引起非常严重的问题

3 死信交换机(Dead Letter Exchanges)

有三种情况可能进死信交换机

被reject或者nack,并且requeue设置为false

消息最大存活时间(TTL)超时

消息数量超过最大队列长度

只需要设置一个args,就ok拉

channel.exchangeDeclare("some.exchange.name", "direct");
Map args = new HashMap();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);
4 Qos

Channel Prefetch Setting (QoS),表示当前channel中未应答消息的数目,如果超过了,队列中将不再接受新的消息。这里所谓的channel就是指从消息服务到消费者的一个通道,简单来说就是指消息从消息队列发送到消费者了,如果没收到应答,就算是一个Qos

加大这个值会增加消息的发送速度(Throughput),但是会加重消息队列的内存,所以100-300之间是一个比较理想的状态,可参考:http://next.rabbitmq.com/conf... :Channel Prefetch Setting (QoS)。

暴力的设置微100-300是存在一些问题的,如果太大,可能消息全部都压在消费者中而得不到消费,看起来队列是空的,实际上全部积压在客户端;如果太小则得不到消费,浪费资源。具体该怎么设置要根据实际的网络吞吐量、以及消费者的消费能力。比如果消费者很快,是内存操作,那么你设置很大,甚至不设置都可以;但是如果消费者很慢,比如是个数据库操作,那么很可能将消息全部积压到消费者而得不到响应
。可以参考http://www.rabbitmq.com/blog/...

Qos同时也会存在一个问题,一个channel是会被多个消费者的,所以必须计算出所有消费者中未应答的数目,这显然是非常不合理的,而且很麻烦,所以可以改为设置每个消费者缓存(Prefetch Buf)可以允许的最大的数目。

例子:

// 1. 一下子设置所有的queue都是10条unacknowledged
Channel channel = ...;
Consumer consumer = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue", false, consumer);

//2. 分别设置10条
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

//3. 分别设置channel和consume,个人不推荐
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true);  // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
5 channelCacheSize

当前最大允许空闲的最大channel数。如果在高并发的环境中,如果值过小的话channel会关关开开非常频繁。所以在1.6的版本中,spring amqp将这个值从1提高到了25。同时你也可以从RabbitMQ Admin中心观察到channel关关开开,那么就可以考虑增大cache的值了。

当你遇到 connetion error的错误时,就可以考虑增大channel cache size了。

6 其它参数和配置 delivery tags

通道(channel)中的消息标志,按照正数递增,消息队列中用来标识消息的唯一标识

Blocked Connection Notifications

当消息服务器资源不足时,会向所有的生产者发送这个消息,我们可以捕获这个消息并做处理,资源可以是内存不足,cpu负载过重等等。

ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
connection.addBlockedListener(new BlockedListener() {
    public void handleBlocked(String reason) throws IOException {
        // Connection is now blocked
    }

    public void handleUnblocked() throws IOException {
        // Connection is now unblocked
    }
});

话说真出现block了也应该是在管理台直接给出警告,不过也可以做一下避免异常

Multiple

全应答标识。如果设置为true,一条消息应答了,那么之前的全部消息将被应答。比如目前channel中有delivery tags为5,6,7,8的消息,那么一旦8被应答,那么5,6,7将都被应答,如果设置为false,那么5,6,7将不会被应答。(不建议设置,毕竟一个channel中会绑定好多consumer)

basic.nack

当消息服务发生异常时,不会发送basic.ack,反而会发送一个basic.nack,而且不会自动requeue,此时需要消息发送方手动处理,进行重发。只有一种情况会发送nack:“basic.nack will only be delivered if an internal error occurs in the Erlang process responsible for a queue”

1 mq配置的listener到什么地方,是配置到每个微服务、或者是配置到mq中?如果在每个服务里面都配置concurency,是不是随着节点的增加,listener数量也会无限增加?
2 ssl和max-queue-length的配置

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70800.html

相关文章

  • 第三十一章:SpringBoot配置文件application.properties参数详解

    摘要:本章主要是贴出一些相关的配置参数,如果需要修改添加对应的参数配置即可。 本章主要是贴出一些SpringBoot相关的配置参数,如果需要修改添加对应的参数配置即可。 application.properties # ---------------------------------------- # CORE PROPERTIES # --------------------------...

    lastSeries 评论0 收藏0
  • Spring Boot RabbitMQ - 优先级队列

    摘要:官方镜像仓库地址本地运行访问可视化面板地址默认账号默认密码集成基本参数配置配置配置定义优先级队列定义交换器定义参考官方文档应用启动后,会自动创建和,并相互绑定,优先级队列会有如图所示标识。 showImg(https://upload-images.jianshu.io/upload_images/3424642-6085f3f9e43c7a4c.png?imageMogr2/auto...

    jackwang 评论0 收藏0
  • rabbitmq

    摘要:一关键字和之间的连接关系实际存储消息。生产者进行接受应答,用来确定这条消息是否正常的发送到了,这种方式也是消息的可靠性投递的核心保障。支持消息的过期时间,在消息发送时可以进行指定。可以监听这个队列中消息做相应的处理。 一、rabbitmq关键字 Binding:Exchange和Exchange、Queue之间的连接关系Queue:实际存储消息。Durability:是否持久化,Du...

    Hwg 评论0 收藏0
  • Java 类文章 - 收藏集 - 掘金

    摘要:而调用后端服务就应用了的高级特分布式配置管理平台后端掘金轻量的分布式配置管理平台。关于网络深度解读后端掘金什么是网络呢总的来说,网络中的容器们可以相互通信,网络外的又访问不了这些容器。 在 Java 路上,我看过的一些书、源码和框架(持续更新) - 后端 - 掘金简书 占小狼转载请注明原创出处,谢谢!如果读完觉得有收获的话,欢迎点赞加关注 物有本末,事有终始,知所先后,则近道矣 ......

    RayKr 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<