理论角度,为什么会存在消息丢失?
Producer 角度
- ACK 设置不当:ACK = 0,生产者不会等待 Leader 副本是否接受到,发送完就认为已经成功了。那么可能存在网络波动导致消息丢失问题。
- 重试次数不够:ACK = 1 或者 All,生产者会等待 Leader 副本返回 ACK。但是,如果此时网络出现问题,导致发送失败,生产者在基于配置的重试次数重试后,仍然发送失败,那么就会抛出异常。
Broker 角度
- 磁盘空间不足:如果 Broker 的磁盘空间不足,会导致消息无法被写入日志文件,从而丢失。
- 日志清理策略:例如
log.retention.hours设置过短,导致消息过早被删除。 - Leader 选举慢:如果 Leader Broker 故障,而 Follower 未能及时选举新的 Leader,可能导致消息未能被复制。
- Follower 数据不完整:Leader 副本确认后,数据未同步到 Follower 副本上。此时 Leader 副本挂了,数据就丢失了。
- 分区不可用:更惨烈的情况,机房挂了,所有服务器都不可用了。
Consumer 角度
- 自动提交问题:自动提交时,消费者只要消费到消息,那么下一次 poll 之前会提交 offet。但如果这个消费消息是异步处理的,比如线程池处理,结果线程池或者服务挂了。第 n 次 poll 的消息在线程池还没有处理,第 n+1 次 poll 时提交了第 n 次的 offset,此时服务挂了,第 n 次 poll 的消息相当于是消费丢失。
- 手动提交问题:消息没处理完就进行提交,导致处理失败后消息丢失。
- 消费者组 Rebalance:在消费者组再平衡期间,如果消费者未能正确处理偏移量提交,可能会导致消息丢失。
Consumer A在 Rebalance 时正在处理M1和M2的消息,那么M1和M2的消息对应的偏移量没有提交。Rebalance 后,该分区被交给Consumer B处理,由于M1和M2的消息对应的偏移量没有提交,Consumer B会处理M1和M2的消息。但假如因为业务逻辑或者其他原因,使得Consumer B发现M1和M2被处理过,那么可能会跳过这两个消息。比如要插入数据库,发现该数据存在,因而跳过,但是插入数据库后的逻辑没有执行。 - 消费者断开连接:消费者处理一半断开连接时也可能出现上面消费跳过导致丢失的情况。
- 增加主题分区 & Producer 先于 Consumer 感知到新增加的分区 & Consumer 设置 latest 读取消息。这种情况导致 Consumer 感知到新分区前,Producer 发送的这些消息就全部“丢失”了,或者说 Consumer 无法感知历史消息。解决办法,改成没有 offset 的时候从 ealiest 读取应该就可以了。
网络故障
- 网络带宽限制。
硬件故障
- 磁盘故障 & 节点故障。
怎么验证是否存在消息丢失问题?
可以利用消息队列的有序性来验证是否有消息丢失。
在 Producer 端,每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。需要注意的是,如果一个 Consumer 对应多个分区,需要检查分区内的序号连续性。
大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性,从而避免了代码不会侵入到业务逻辑中。
理论角度,怎么解决消息丢失的问题?
生产者
- 配置恰当的 ACKs 策略:使用
ACKs=all来确保消息在所有 ISR 上持久化。 - 设置合理的重试策略:配置生产者的重试次数和重试间隔,确保消息能够成功发送。
- 不要使用
producer.send(msg),而要使用producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。- Spring Boot 中 Kafka 生产者使用
send方法发送消息实际上是异步的操作,我们可以通过get()方法获取调用结果,但是这样也让它变为了同步操作。所以,需要采用 callback 的方式。详细代码参考:Kafka 系列第三篇!10 分钟学会如何在 Spring Boot 程序中使用 Kafka 作为消息队列?
- Spring Boot 中 Kafka 生产者使用
Broker 端
- 设置
unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。 - 设置
replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。 - 设置
min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。 - 确保
replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成replication.factor = min.insync.replicas + 1。
消费者端
- 消费者端正确处理偏移量提交:确保消息消费完再提交。
- 消费消息时尽可能幂等性。
其他
- 监控和告警:实时监控 Kafka 集群的状态,包括 Broker 的健康状况、磁盘空间使用情况等,并设置告警机制。
- 备份和恢复:定期备份 Kafka 的数据,并测试恢复流程,以确保在灾难发生时能够快速恢复。