June's Studio.

MQ-kafka服务器宕机了,消息会丢失吗

字数统计: 2.1k阅读时长: 7 min
2023/03/29

消息队列可谓是高并发下的必备中间件了,而 Kafka 作为其中的佼佼者,经常被我们使用到各种各样的场景下。
随着 Kafka 而来得,还有三个问题:消息丢失、消息重复、消息顺序。我们首先聊聊消息丢失的问题。

文章思维导图

可靠性级别

回到标题提出的问题:我们是否真的能保证 Kafka 消息不丢失?

答案是:我们无法保证 Kafka 消息不丢失,只能保证某种程度下,消息不丢失。

这里所说的某些情况,从严重程度依次为:Kafka 宕机、服务器宕机、机房地震、城市毁灭、地球毁灭。不要觉得树哥在危言耸听,如果你的服务器部署在乌克兰的首都,那是不是就会遭遇城市毁灭的风险了?因此,我们根据业务的重要程度,设置合理的可靠性级别,毕竟可靠性级别越高,付出的成本越高。

如果你的应用是金融类型或者国民级别的应用,那么你需要考虑机房地震以上级别的可靠性级别,否则一般考虑到服务器宕机这个维度就可以了。对于机房地震级别以上的情况,大多数都是需要做异地多活,然后做好各地机房数据的实时同步。即使地球毁灭了,你在火星部署了一个机房,其原理也是类似。

我想大多数同学的应用可靠性,可能都只需要考虑到服务器宕机级别,因此后续的考虑也仅限于这个级别。

从大局看 Kafka

要让 Kafka 消息不丢失,那么我们必须知道 Kafka 可能在哪些地方丢数据,因此弄清楚 Kafka 消息流转的整个过程就非常重要了。对 Kafka 来说,其整体架构可以分为生产者、Kafka 服务器、消费者三大块,其整体架构如下图所示。

Kafka 消息架构图

生产者

对生产者来说,其发送消息到 Kafka 服务器的过程可能会发生网络波动,导致消息丢失。对于这一个可能存在的风险,我们可以通过合理设置 Kafka 客户端的 request.required.acks 参数来避免消息丢失。该参数表示生产者需要接收来自服务端的 ack 确认,当收不到确认或者超市时,便会抛出异常,从而让生产者可以进一步进行处理。

该参数可以设置不同级别的可靠性,从而满足不同业务的需求,其参数设置及含义如下所示:

  • request.required.acks = 0 表示 Producer 不等待来自 Leader 的 ACK 确认,直接发送下一条消息。在这种情况下,如果 Leader 分片所在服务器发生宕机,那么这些已经发送的数据会丢失。
  • request.required.acks = 1 表示 Producer 等待来自 Leader 的 ACK 确认,当收到确认后才发送下一条消息。在这种情况下,消息一定会被写入到 Leader 服务器,但并不保证 Follow 节点已经同步完成。所以如果在消息已经被写入 Leader 分片,但是还未同步到 Follower 节点,此时Leader 分片所在服务器宕机了,那么这条消息也就丢失了,无法被消费到。
  • request.required.acks = -1 表示 Producer 等待来自 Leader 和所有 Follower 的 ACK 确认之后,才发送下一条消息。在这种情况下,除非 Leader 节点和所有 Follower 节点都宕机了,否则不会发生消息的丢失。

如上所示,如果业务对可靠性要求很高,那么可以将 request.required.acks 参数设置为 -1,这样就不会在生产者阶段发生消息丢失的问题。

Kafka 服务器

当 Kafka 服务器接收到消息后,其并不直接写入磁盘,而是先写入内存中。随后,Kafka 服务端会根据不同设置参数,选择不同的刷盘过程,这里有两个参数控制着这个刷盘过程:

1
2
3
4
# 数据达到多少条就将消息刷到磁盘
#log.flush.interval.messages=10000
# 多久将累积的消息刷到磁盘,任何一个达到指定值就触发写入
#log.flush.interval.ms=1000

如果我们设置 log.flush.interval.messages=1,那么每次来一条消息,就会刷一次磁盘。通过这种方式,就可以降低消息丢失的概率,这种情况我们称之为同步刷盘。 反之,我们称之为异步刷盘。与此同时,Kafka 服务器也会进行副本的复制,该 Partition 的 Follower 会从 Leader 节点拉取数据进行保存。然后将数据存储到 Partition 的 Follower 节点中。

对于 Kafka 服务端来说,其会根据生产者所设置的 request.required.acks 参数,选择什么时候回复 ack 给生产者。对于 acks 为 0 的情况,表示不等待 Kafka 服务端 Leader 节点的确认。对于 acks 为 1 的情况,表示等待 Kafka 服务端 Leader 节点的确认。对于 acks 为 1 的情况,表示等待 Kafka 服务端 Leader 节点好 Follow 节点的确认。

但要注意的是,Kafka 服务端返回确认之后,仅仅表示该消息已经写入到 Kafka 服务器的 PageCache 中,并不代表其已经写入磁盘了。这时候如果 Kafka 所在服务器断电或宕机,那么消息也是丢失了。而如果只是 Kafka 服务崩溃,那么消息并不会丢失。

因此,对于 Kafka 服务端来说,即使你设置了每次刷 1 条消息,也是有可能发生消息丢失的,只是消息丢失的概率大大降低了。

消费者

对于消费者来说,如果其拉取消息之后自动返回 ack,但消费者服务在处理过程中发生崩溃退出,此时这条消息就相当于丢失了。对于这种情况,一般我们都是采用业务处理完之后,手动提交 ack 的方式来避免消息丢失。

在我们在业务处理完提交 ack 这种情况下,有可能发生消息重复处理的情况,即业务逻辑处理完了,但在提交 ack 的时候发生异常。这要求消费者在处理业务的时候,每一处都需要进行幂等处理,避免重复处理业务。

能不丢失吗?

根据我们上面的分析,Kafka 只能做到 Kafka 应用崩溃这个级别,因为 Kafka 的 acks 仅仅表示写入了 PageCache。

如果服务器宕机了,即使我们设置了每来一条消息就写入一次磁盘,那么也有可能在写入 PageCache 后、写入磁盘前这个关键点,服务器发生宕机。这时候 PageCache 里面的消息数据就没了,那么消息自然也就丢失了。但如果仅仅是 Kafka 应用崩溃退出,因为其已经写入到 PageCache 中了,那么系统自然会将其写入到磁盘中,因此消息并不会丢失。

总结

消息可靠性级别,一定是跟业务重要性关联在一起的。我们无法抛开业务本身的重要性来谈可靠性,只能是取一个平衡的值。

根据我的经验来说,除非是金融类或国民级别的应用,否则只需要考虑到服务器宕机的级别就可以了。而如果是金融级别或国民级别的应用,那么就需要考虑到城市毁灭的可靠性级别。但地球毁灭这个,我想谁也不会去考虑吧。

对于大多数的应用,考虑服务器宕机级别的情况下,对于 Kafka 消息来说,只需要考虑如下几个内容即可:

  1. 生产者。 根据业务重要性,设置好 acks 参数,并做好业务重试,以及告警记录即可。
  2. Kafka 服务端。 根据业务重要性,设置好刷盘参数即可,一般来说都不需要设置成同步刷盘。
  3. 消费者。 使用手动提交 acks 的方式,避免丢失消息,同时需要做好幂等处理,避免重复处理。

本文的思维导图如下所示。

文章思维导图

CATALOG
  1. 1. 可靠性级别
  2. 2. 从大局看 Kafka
    1. 2.1. 生产者
    2. 2.2. Kafka 服务器
    3. 2.3. 消费者
  3. 3. 能不丢失吗?
  4. 4. 总结