卡夫卡的藏书阁【Book3】- Kafka 讯息与主题 Record and Topic

半夜的苹果发表会,想起了贾伯斯的那句“Stay Hungry. Stay Foolish”
“Youth is happy because it has the capacity to see beauty. Anyone who keeps the ability to see beauty never grows old.”
Franz Kafka

转眼也快奔三了啊


3.1 Kakfa 中的一笔讯息 ( Record )

https://ithelp.ithome.com.tw/upload/images/20210907/20140255DCsxmBw5zg.png

这边会先从最小的一笔讯息可以介绍起,Kafka 中主题的每一笔讯息都是包含索引键值 ( key-value )时间戳 ( timstamp )

  • 键 ( key ) 可以不用设定、不设定的情况下预设会自动将讯息平均发配所有分区上,如果有设定可以用来当作写入哪个分区的依据,一般最常见的做法是将杂凑值 ( hash ) 除以分区的数量得出余数,将讯息分配给相对应编号的分区。

  • 时间戳 ( timestamp )

    1. 拉取资料时的索引: 一般来说,Kafka 的常态用法跟传统的 MQ 一样取资料都是即时的 pop 出来使用,但如果有以时间戳取得某个时间点的偏移量,并从那个点开始拉取资料
    2. 作为删除的依据: Kafka 预设是七天会将旧资料资料清除
    3. 作为切分的依据: Kafka 预设也是七天会将log档案切分
  • 值 ( value ) 的部分,通常是以 JSON (Javascript Object Notation) 的格式储存,通常建议可以统一加上一些标示或描述让讯息可读性更高、更容易理解。

讯息可以一笔笔的传送,但是在大数量的情境下会消耗掉很大量的网路传输成本,因此 Kafka 是批次写入的,但是批次写入一定会造成写入的延迟性,这必须视情况下去考量,看使用场景是 I/O 重要、还是低延迟比较重要。

3.2 来看看讯息 ( log ) 实际长怎样吧

Kafka内建的kafka-run-class kafka.tools.DumpLogSegments指令可以查看log档案内容,—file为必带、用逗号隔开可查询多个档案,指令是精华後面会再详细介绍。

这边主要是先让大家看一下,讯息的 metadata 有哪些,一般需要知道就是上面提到的索引键、值、时间戳和偏移量。

可以看到 Kafka 是会先讯息累积到 batch 中,等累积达一定量再送出,先以这一笔来看,就是累积了三笔 ( count: 3 ) 一次送出。

baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771619770 size: 98 magic: 2 compresscodec: NONE crc: 16374966 isvalid: true

以下是主题 topic_for_test_lot 三个分区的 log 档案内容

但是这边看不到讯息实际的内容,只有一个个包起来的批次

$ kafka-run-class kafka.tools.DumpLogSegments --files broker1/topic_for_test_log-0/00000000000000000000.log,broker1/topic_for_test_log-1/00000000000000000000.log,broker2/topic_for_test_log-2/00000000000000000000.log

Dumping broker1/topic_for_test_log-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771619770 size: 98 magic: 2 compresscodec: NONE crc: 16374966 isvalid: true
baseOffset: 3 lastOffset: 4 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 98 CreateTime: 1631771621294 size: 81 magic: 2 compresscodec: NONE crc: 487960023 isvalid: true
baseOffset: 5 lastOffset: 10 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 179 CreateTime: 1631771625952 size: 128 magic: 2 compresscodec: NONE crc: 3083079563 isvalid: true
baseOffset: 11 lastOffset: 16 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 307 CreateTime: 1631771634662 size: 124 magic: 2 compresscodec: NONE crc: 1152822458 isvalid: true


Dumping broker1/topic_for_test_log-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771626645 size: 95 magic: 2 compresscodec: NONE crc: 1577038537 isvalid: true
baseOffset: 3 lastOffset: 9 count: 7 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 95 CreateTime: 1631771633592 size: 135 magic: 2 compresscodec: NONE crc: 372605860 isvalid: true
baseOffset: 10 lastOffset: 15 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 230 CreateTime: 1631771635665 size: 122 magic: 2 compresscodec: NONE crc: 2359930090 isvalid: true


Dumping broker2/topic_for_test_log-2/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 5 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771620918 size: 124 magic: 2 compresscodec: NONE crc: 2581760372 isvalid: true
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 124 CreateTime: 1631771627429 size: 69 magic: 2 compresscodec: NONE crc: 3572829527 isvalid: true
baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 193 CreateTime: 1631771635834 size: 69 magic: 2 compresscodec: NONE crc: 1038836103 isvalid: true

如果想要看到每个 batch 中的每一笔讯息内容(payload栏位),还需要加上参数--print-data-log,这样就可以看到真实的记录了

$ kafka-run-class kafka.tools.DumpLogSegments --files broker1/topic_for_test_log-0/00000000000000000000.log --print-data-log

Dumping broker1/topic_for_test_log-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771619770 size: 98 magic: 2 compresscodec: NONE crc: 16374966 isvalid: true
| offset: 0 isValid: true crc: null keySize: -1 valueSize: 7 CreateTime: 1631771618877 baseOffset: 0 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 98 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: asdf as
| offset: 1 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1631771619471 baseOffset: 0 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 98 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: sdf
| offset: 2 isValid: true crc: null keySize: -1 valueSize: 4 CreateTime: 1631771619770 baseOffset: 0 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 98 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: asdf
baseOffset: 3 lastOffset: 4 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 98 CreateTime: 1631771621294 size: 81 magic: 2 compresscodec: NONE crc: 487960023 isvalid: true
| offset: 3 isValid: true crc: null keySize: -1 valueSize: 2 CreateTime: 1631771621106 baseOffset: 3 lastOffset: 4 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 81 magic: 2 compressType: NONE position: 98 sequence: -1 headerKeys: [] payload: as
| offset: 4 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1631771621294 baseOffset: 3 lastOffset: 4 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 81 magic: 2 compressType: NONE position: 98 sequence: -1 headerKeys: [] payload: dfa
...
...
baseOffset: 11 lastOffset: 16 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 307 CreateTime: 1631771634662 size: 124 magic: 2 compresscodec: NONE crc: 1152822458 isvalid: true
| offset: 11 isValid: true crc: null keySize: -1 valueSize: 4 CreateTime: 1631771633791 baseOffset: 11 lastOffset: 16 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 124 magic: 2 compressType: NONE position: 307 sequence: -1 headerKeys: [] payload: asdf
...
...
| offset: 15 isValid: true crc: null keySize: -1 valueSize: 2 CreateTime: 1631771634475 baseOffset: 11 lastOffset: 16 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 124 magic: 2 compressType: NONE position: 307 sequence: -1 headerKeys: [] payload: fa
| offset: 16 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1631771634662 baseOffset: 11 lastOffset: 16 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 124 magic: 2 compressType: NONE position: 307 sequence: -1 headerKeys: [] payload: dsf

3.3 讯息的元资料 ( metadata )到底存了什麽

纪录是看到了,那到底储存在最深处的元资料代表了什麽,以下一个一个参数为大家初步介绍,後续篇章都会有近一步实际范例去使用、影响这些参数,目前只需要有个印象就足够了。

  • baseOffset: 0: 该批次的起始偏移量
  • lastOffset: 2: 该批次结束的偏移量
  • count: 3: 该批次的纪录有几笔
  • baseSequence: -1: 需要设定消费者设定 idempotent 参数为真才会生效,否则为 -1
  • lastSequence: -1: 需要设定消费者设定 idempotent 参数为真才会生效,否则为 -1
  • producerId: -1: 生产者的编号,这边因为使用内建的shell去新增资料所以为 -1
  • producerEpoch: -1 : 生产者的 Epoch 编号,这边因为使用内建的shell去新增资料所以为 -1
  • partitionLeaderEpoch: 0: 该笔资料的 partition leader 是谁
  • isTransactional: false: 是否为事务
  • isControl: false
    • control batch 会有一笔 control record 是用来让消费者判断这个批次是否为失败的事务
  • CreateTime: 1631771619770: Partition Leader 收到讯息的时间点
  • size: 98: 该批次的大小
  • magic: 2 => 代表讯息的版本是 V2,Apache Kafka 在版本0.8之前支援 old consumerold producer 也就是 V1版本,而现行版本都是使用 V2。
  • compresscodec: NONE
    • 压缩的方式预设是 NONE,建议是都要开启
    • 1:None, 2: Gzip, 3: Snappy, 4: Lz4

  • crc: 16374966
    • 是用来验证资料在写入硬碟的过程中是否有意外中断导致资料遗失
    • 无可避免的会增加一些资料量,如果追求极致的效能,可以考虑关闭此验证功能

下列是 on-disk 的 RecordBatch 资料格式:

		baseOffset: int64
		batchLength: int32
		partitionLeaderEpoch: int32
		magic: int8 (current magic value is 2)
		crc: int32
		attributes: int16
			bit 0~2:
				0: no compression
				1: gzip
				2: snappy
				3: lz4
				4: zstd
			bit 3: timestampType
			bit 4: isTransactional (0 means not transactional)
			bit 5: isControlBatch (0 means not a control batch)
			bit 6~15: unused
		lastOffsetDelta: int32
		firstTimestamp: int64
		maxTimestamp: int64
		producerId: int64
		producerEpoch: int16
		baseSequence: int32
		records: [Record]

CRC 验证的范围是从attributes一直到这个batch的尾端,CRC 位置是在 magic 之後,这是因为必须先解析magic 类型才能决定要怎麽解析batch lengthmagic 之间的 byte,而 partition leader epoch 也不包含在 CRC 验证的区间,这是为了避免这个栏位值在被 broker 重新分配时 CRC 必须要重新计算,这里 CRC 是使用 CRC-32C 来计算。


  • isvalid: true

这个参数是 Kafka 2.4.0版本新增的,主要是为了更清楚地显示错误的发生原因,像是版本 ( magic ) 对应错误、CRCchecksum 错误...等,会藉由 future 物件 RecordMetadata 将详细的错误原因回传。


资料来源:


<<:  Day-3: Rails的Route + MVC架构

>>:  第2车厢-砸重金才懂得:HTML标签

以Ardiuno控制CPU散热器呼吸灯

▲ AR12-TUF采用导热管与CPU直接接触的热导管直触技术(HDC),4根Ø 6mm全铜热导管...

[Day 27] 微探讨 Pure pipe 与 Impure pipe

今天要介绍的 Tip 是有关於 pipe 的 pure 与 impure,当没有任何额外的设定下,自...

【在 iOS 开发路上的大小事-Day02】抛弃 Storyboard 改用 Xib 来做全部的 UI 设计吧

前情提要 一般在用 Xcode 创新专案的时候,会预设使用 Main.storyboard 来作为我...

【把玩Azure DevOps】Day9 CI/CD从这里:编译专案与上传成品

前面的文章都是在介绍Pipeline介面、范本内容,这一篇终於要真正进入正题,将Repo中的Cons...

Day4 技术与人力资源的管理,外包的抉择

继承昨日的漫谈,当一个专案已经开始拆解步骤和预期的交付产出,同时专案经理就会心里琢磨着「这件工作要找...