Kafka中消息的存储

Kafka 消息的存储

消息发到 broker 上以后,消息是如何持久化的呢?在 Kafka 中是使用日志文件的方式来保存消息的,每一条消息都有一个 offset 值来表示在分区中的偏移量。
消息的存储一般都是非常大的,为了避免日志文件过大,log 并不是直接对应一个磁盘上的日志文件,而是对应磁盘上的一个目录。
消息存储在文件中一般都包含,日志文件(.log),索引文件(.index),时间索引文件(*.timeindex)。Kafka 中通过分段的方式将 log 分为多个 logSegment,一个 logSegment 对应磁盘上的一个日志文件和一个索引文件,其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。

logSegment

我们可以想象一下,如果 Kafka 中以 partition 为最小存储单位,假设 producer 不断的发送消息,那 partition 文件将会无限扩张,这样对于消息文件的维护以及被消费的消息清理工作带来了非常大的挑战,所以 Kafka 又以 segment 为单位把 partition 进行细分。
每个 partition 相当于一个巨型文件被平均分配到N个 segment 数据文件中,这种方式方便已经被消费的消息的清理,提高了磁盘的利用率。在配置文件中通过log.segment.bytes=107370设置分段大小,默认是1gb。
segment file 由2大部分组成,分别为 index file 和 data file,此2个文件一一对应,成对出现,后缀“.index”和“.log”分别表示为segment索引文件、数据文件。
segment 文件命名规则:partition 全局的第一个 segment 从0开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值进行递增。数值最大为64位long大小,20位数字字符长度,没有数字用0填充。
假如第一个 log 文件的最后一个 offset 为:6666,下一个segment的文件命名为:00000000000000006666.log。对应的index为00000000000000006666.index。可以通过 index 中的索引找到对应 log 中对应的消息。

  1. 根据 offset 的值,查找 segment 段中的 index 索引文件。由于索引文件命名是以上一个文件的最后一个 offset 进行命名的,所以,使用二分查找算法能够根据offset快速定位到指定的索引文件。
  2. 找到索引文件后,根据 offset 进行定位,找到索引文件中的符合范围的索引。
  3. 得到 position 以后,再到对应的 log 文件中,从 position 出开始查找 offset 对应的消息,将每条消息的 offset 与目标 offset 进行比较,直到找到消息。

日志清除策略

Kafka 中日志以分段(segment)存储,虽然说以分段存储简化了单个文件的内容大小,但这么一直进行下去,也会有磁盘满的一天,在 Kafka 中会存在以下几种日志清除的策略。

  1. 默认保存时间为7天。超过这个时间则删除
  2. 根据消息的保存时间,当消息在保存的时间超过指定的时间就会触发清理过程,服务端的配置文件中更改log.retention.hours
  3. 根据 topic 存储的数据大小,当 topic 所占的日志文件大小大于一定的阀值,则开始删除最旧的消息。Kafka 会启动一个后台线程,定期检查是否存在可以删除的消息。服务端的配置文件中更改log.retention.bytes

日志压缩策略

Kafka还提供了“日志压缩”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的 key 和 value 的值之间的对应关系是不断变化的。
而消费者只关心 key 对应的最新的 value。我们可以开启 Kafka 的日志压缩功能,服务端会在后台启动启动 Cleaner 线程池,定期将相同的 key进行合并,只保留最新的 value 值。
压缩功能在服务端的配置文件中更改log.cleaner.enable
如下图所示