kafka的消息持久化文件

时间:2023-01-09 14:17:08

最近排查kafka的问题,涉及到了kafka的消息存储,本文就相关内容进行总结。

我们都知道,topic是有分区(partition)的概念的, 生产者往同一个topic发送的消息最终是发送到了不同的分区里面。也就是说,一个topic里的消息是由该topic下所有分区里的消息组成的在同一个分区里,消息是有序的,而不同分区中,消息是不能保证有序的。

有了这个简单认识后,自然可以知道,每个分区仅会存储该分区下的消息。在配置("log.dirs")指定的目录下,有很多以"$topic-$partition"为名称的目录,在每个这样的目录下,就存放对应"topic",对应"partition"消息的持久化文件。

kafka的消息持久化文件

注:由于每个分区都有leader的概念,而不同分区的leader可能位于不同的broker上,除了leader之外,分区还有副本(replica)的概念,因此每个broker只会存储分区leader或副本位于该broker中的topic的消息

在《kafka客户端消息发送逻辑》一文中提到了,生产者发送消息时,其实是一批(batch)一批来发送的,一批消息中可能包含一条或多条消息。同时,上面也提到了,同一个topic的同一个分区里的消息是有序的。有序的消息通常会有一个偏移量的概念。kafka内部对消息持久化存储时,也遵循类似的理念,按批次存储,同时记录消息的偏移位置,以及消息的时间戳等信息。

在具体实现中,一个分区内的消息,划分为多个segment,segment是一个逻辑概念,一个segment对应一个消息段,一个消息段中又包含一批或多批消息(如下图中的RecordBatch),一批消息就是客户端按batch组装发送过来的消息集,包含一条或多条消息(如下图中的Record)。

一个segment由三个文件组成,分别为消息文件(*.log)存储具体的消息内容、消息索引文件(*.index)存储消息在分区中的索引、消息时间戳索引文件(*.timeindex)则存储了消息对应的时间戳。这三个文件均以文件中存储的首个消息在分区中的偏移量作为文件名的前缀。

kafka的消息持久化文件

接下来就分别讲述下这几个文件的具体格式。

1) *.log

log文件中的内容就是一个segment中实际包含的消息然后按批次进行存储。每一批消息都包含固定字节长度的头部信息,以及一到多条消息。在头部信息中存储了基准偏移(BaseOffset),即该批次中的第一条消息在整个分区中的偏移位置;长度(Length);分区leader的epoch(LeaderEpoch);用于指定消息存储格式的魔数(Magic);校验和(CRC);消息批属性(Attributes);该批次最后一条消息与第一条消息的相对偏移(LastOffsetDelta);第一条消息的时间戳(FirstTimestamp);该批次的消息数(RecordCount)等内容。

而每条消息则记录了消息的整体长度、属性、消息的key、实际内容、头信息等。

kafka的消息持久化文件

需要注意的是:在消息的存储格式中,除属性字段固定1字节外,其他信息均采用zigzag的编码方式这样可以有效压缩存储空间。

以一个实际文件内容,对照上面的方式解析来查看下,文件的开头如下所示:

kafka的消息持久化文件

对照上面的格式可以得到:

0000 0000 0000 0000  BaseOffset:  0
0000 3cb8            Length: 15544
0000 0000            LeaderEpoch: 0
02                   Magic: 2
be2a b271            CRC: 3190469233    
0000                 Attributes:  
0000 000e            LastOffsetDelta: 14
0000 0185 7572 a380  FirstTimestamp: 1672712725376
0000 0185 7572 a387  MaxTimestamp: 1672712725383
ffff ffff ffff ffff  ProducerId: -1
ffff                 ProducerEpoch: -1
ffff ffff            BaseSequence: -1
0000 000f            recordCount: 15

中间为15条消息的内容,这里省略不分析,再之后,可以看到第二批消息的头信息:

kafka的消息持久化文件

同样,按照前面的格式可以分析可以得到:

0000 0000 0000 000f  BaseOffset:  15
0000 3cb8            Length: 15544
0000 0000            LeaderEpoch: 0
02                   Magic: 2
e3ff 46f2            CRC: 3825157874    
0000                 Attributes:  
0000 000e            LastOffsetDelta: 14
0000 0185 7572 a387  FirstTimestamp: 1672712725383
0000 0185 7572 a388  MaxTimestamp: 1672712725384
ffff ffff ffff ffff  ProducerId: -1
ffff                 ProducerEpoch: -1
ffff ffff            BaseSequence: -1
0000 000f            recordCount:     15

后面的内容可以依次类推。另外, 通过自带命令也能和实际解析的内容对得上:

[root@kafka-0 bin]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files /opt/data/kafka/hncscwc-0/00000000000000000000.log
Dumping /opt/data/kafka/hncscwc-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 14 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1672712725383 isvalid: true size: 15556 magic: 2 compresscodec: NONE crc: 3190469233
baseOffset: 15 lastOffset: 29 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 15556 CreateTime: 1672712725384 isvalid: true size: 15556 magic: 2 compresscodec: NONE crc: 3825157874
baseOffset: 30 lastOffset: 44 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 31112 CreateTime: 1672712725385 isvalid: true size: 15556 magic: 2 compresscodec: NONE crc: 2410094720
baseOffset: 45 lastOffset: 59 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 46668 CreateTime: 1672712725385 isvalid: true size: 15556 magic: 2 compresscodec: NONE crc: 3333194410
baseOffset: 60 lastOffset: 74 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 62224 CreateTime: 1672712725386 isvalid: true size: 15556 magic: 2 compresscodec: NONE crc: 2980189431
baseOffset: 75 lastOffset: 89 count: 15 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 77780 CreateTime: 1672712725388 isvalid: true size: 15556 magic: 2 compresscodec: NONE crc: 3334415833
。。。

注:对于采用事务方式写入的数据,这里暂不举例说明。

2)*.index

该文件记录了消息在log文件中的起始偏移位置。其文件格式比较简单,由多个条目组成, 每个条目固定4字节的消息偏移量加固定4字节的文件偏移量。

kafka的消息持久化文件

实际文件内容示例如下图所示:

kafka的消息持久化文件

同样,通过自带命令也能和上述分析内容对上:

[root@kafka-0 bin]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files /opt/data/kafka/hncscwc-0/00000000000000000000.index
Dumping /opt/data/kafka/hncscwc-0/00000000000000000000.index
offset: 29 position: 15556
offset: 44 position: 31112
offset: 59 position: 46668
offset: 74 position: 62224
offset: 89 position: 77780
offset: 104 position: 93336
offset: 119 position: 108892
offset: 134 position: 124448
offset: 149 position: 140004
offset: 164 position: 155560
offset: 179 position: 171116
offset: 194 position: 186672
offset: 209 position: 202228
offset: 224 position: 217784
offset: 239 position: 233340
offset: 254 position: 248896

3)*.timeindex

timestamp是从0.10版本开始引入的功能,每条消息都有一个对应的时间戳。生产者可以配置设置时间戳的类型,默认为创建时间(另外一个可选值是日志追加时间,即写入的时间)

该文件记录了不同时间戳对应的消息的偏移。文件格式和index一样,由多个条目组成,每个条目为固定8字节的时间戳加固定4字节的偏移量构成。这里就不再实际举例说明了。


小结一下,本文主要分析了kafka消息的持久化文件,以及具体的文件格式。由兴趣的朋友也可以对照分析下,对于kafka具体将消息写入的时机是怎样的,如何决定应该将消息写入新的segment。消息的读取逻辑又是怎样的,后续再结合源码进行剖析。

好了,这就是本文的全部内容,如果觉得本文对您有帮助,请点赞+转发,也欢迎加我微信交流~