摘要:消费端弄丢了数据关闭自动提交,在自己处理完毕之后手动提交,这样就不会丢失数据。弄丢了数据一般要求设置个参数来保证消息不丢失给设置参数这个值必须大于,表示要求每个必须至少有个副本。上一篇如何保证消息不重复消费下一篇如何保证消息按顺序执行
1.mq原则数据不能多,也不能少,不能多是说消息不能重复消费,这个我们上一节已解决;不能少,就是说不能丢失数据。如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的。
2.丢失数据场景丢数据一般分为两种,一种是mq把消息丢了,一种就是消费时将消息丢了。下面从rabbitmq和kafka分别说一下,丢失数据的场景,
(1)rabbitmq
A:生产者弄丢了数据
生产者将数据发送到rabbitmq的时候,可能在传输过程中因为网络等问题而将数据弄丢了。
B:rabbitmq自己丢了数据
如果没有开启rabbitmq的持久化,那么rabbitmq一旦重启,那么数据就丢了。所依必须开启持久化将消息持久化到磁盘,这样就算rabbitmq挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,rabbitmq还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。
C:消费端弄丢了数据
主要是因为消费者消费时,刚消费到,还没有处理,结果消费者就挂了,这样你重启之后,rabbitmq就认为你已经消费过了,然后就丢了数据。
3.如何防止消息丢失
(1)rabbitmq
A:生产者丢失消息
①:可以选择使用rabbitmq提供是事物功能,就是生产者在发送数据之前开启事物,然后发送消息,如果消息没有成功被rabbitmq接收到,那么生产者会受到异常报错,这时就可以回滚事物,然后尝试重新发送;如果收到了消息,那么就可以提交事物。
channel.txSelect();//开启事物 try{ //发送消息 }catch(Exection e){ channel.txRollback();//回滚事物 //重新提交 }
缺点: rabbitmq事物已开启,就会变为同步阻塞操作,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。
②:可以开启confirm模式。在生产者哪里设置开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如何写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。
//开启confirm channel.confirm(); //发送成功回调 public void ack(String messageId){ } // 发送失败回调 public void nack(String messageId){ //重发该消息 }
二者不同
事务机制是同步的,你提交了一个事物之后会阻塞住,但是confirm机制是异步的,发送消息之后可以接着发送下一个消息,然后rabbitmq会回调告知成功与否。
一般在生产者这块避免丢失,都是用confirm机制。
B:rabbitmq自己弄丢了数据
设置消息持久化到磁盘。设置持久化有两个步骤:
①创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里面的数据。
②发送消息的时候讲消息的deliveryMode设置为2,这样消息就会被设为持久化方式,此时rabbitmq就会将消息持久化到磁盘上。
必须要同时开启这两个才可以。
而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前rabbitmq挂了,数据丢了,生产者收不到ack回调也会进行消息重发。
C:消费者弄丢了数据
使用rabbitmq提供的ack机制,首先关闭rabbitmq的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。
(2)kafka
A:消费端弄丢了数据
关闭自动提交offset,在自己处理完毕之后手动提交offset,这样就不会丢失数据。
B:kafka弄丢了数据
一般要求设置4个参数来保证消息不丢失:
①给topic设置 replication.factor参数:这个值必须大于1,表示要求每个partition必须至少有2个副本。
②在kafka服务端设置min.isync.replicas参数:这个值必须大于1,表示 要求一个leader至少感知到有至少一个follower在跟自己保持联系正常同步数据,这样才能保证leader挂了之后还有一个follower。
③在生产者端设置acks=all:表示 要求每条每条数据,必须是写入所有replica副本之后,才能认为是写入成功了
④在生产者端设置retries=MAX(很大的一个值,表示无限重试):表示 这个是要求一旦写入事变,就无限重试
C:生产者弄丢了数据 如果按照上面设置了ack=all,则一定不会丢失数据,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
上一篇《如何保证消息不重复消费》
下一篇《如何保证消息按顺序执行》
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/7233.html
摘要:能不能支持数据丢失啊可以的,参考我们之前说的那个数据零丢失方案其实一个肯定是很复杂的,其实这是个开放题,就是看看你有没有从架构角度整体构思和设计的思维以及能力。其实回答这类问题,说白了,起码不求你看过那技术的源码,起码你大概知道那个技术的基本原理,核心组成部分,基本架构构成,然后参照一些开源的技术把一个系统设计出来的思路说一下就好 比如说这个消息队列系统,我们来从以下几个角度来考虑一下 (1...
摘要:数量对吞吐量的影响可以达到几百几千个的级别,吞吐量会有小幅度的下降。这是的一大优势,可在同等数量机器下支撑大量的从几十个到几百个的时候,吞吐量会大幅下降。下一篇如何保证消息队列的高可用 1.为什么使用消息队列? (1)解耦:可以在多个系统之间进行解耦,将原本通过网络之间的调用的方式改为使用MQ进行消息的异步通讯,只要该操作不是需要同步的,就可以改为使用MQ进行不同系统之间的联系,这样项目之间...
摘要:一个对应一个,但是里面进行了多线程消费,这样也会造成消息消费顺序错误。保证消息的消费顺序拆分多个,每个一个,就是多一些而已,确实是麻烦点这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式取消费。 1.为什么要保证顺序 消息队列中的若干消息如果是对同一个数据进行操作,这些操作具有前后的关系,必须要按前后的顺序执行,否则就会造成数据异常。举例: 比如通过mysql binlog进行两个数据...
摘要:紧接着征用倍的机器来部署,每一批消费一个临时的消息。这种做法相当于临时将资源和资源扩大倍,以正常速度的倍来消费消息。解决方案这种情况下,实际上没有什么消息挤压,而是丢了大量的消息。 1.大量消息在mq里积压了几个小时了还没解决 场景: 几千万条数据在MQ里积压了七八个小时,从下午4点多,积压到了晚上很晚,10点多,11点多。线上故障了,这个时候要不然就是修复consumer的问题,让他恢复消...
摘要:的过期策略是什么样的采用了定期删除惰性删除的过期策略。定期删除原理定期删除指的是默认每隔就随机抽取一些设置了过期时间的,检测这些是否过期,如果过期了就将其删掉。所有只会抽取一部分而不会全部检查。 1.数据为什么会过期? 首先,要明白redis是用来做数据缓存的,不是用来做数据存储的(当然也可以当数据库用),所以数据时候过期的,过期的数据就不见了,过期主要有两种情况, ①在设置缓存数据时制定了...
阅读 842·2021-11-24 10:44
阅读 2777·2021-11-11 16:54
阅读 3159·2021-10-08 10:21
阅读 2064·2021-08-25 09:39
阅读 2899·2019-08-30 15:56
阅读 3458·2019-08-30 13:46
阅读 3492·2019-08-23 18:09
阅读 2064·2019-08-23 17:05