半夜的苹果发表会,想起了贾伯斯的那句“Stay Hungry. Stay Foolish”
“Youth is happy because it has the capacity to see beauty. Anyone who keeps the ability to see beauty never grows old.”
― Franz Kafka
转眼也快奔三了啊
这边会先从最小的一笔讯息可以介绍起,Kafka 中主题的每一笔讯息都是包含索引键值 ( key-value ) 和时间戳 ( timstamp )。
键 ( key ) 可以不用设定、不设定的情况下预设会自动将讯息平均发配所有分区上,如果有设定可以用来当作写入哪个分区的依据,一般最常见的做法是将杂凑值 ( hash ) 除以分区的数量得出余数,将讯息分配给相对应编号的分区。
时间戳 ( timestamp )
log
档案切分值 ( value ) 的部分,通常是以 JSON (Javascript Object Notation) 的格式储存,通常建议可以统一加上一些标示或描述让讯息可读性更高、更容易理解。
讯息可以一笔笔的传送,但是在大数量的情境下会消耗掉很大量的网路传输成本,因此 Kafka 是批次写入的,但是批次写入一定会造成写入的延迟性,这必须视情况下去考量,看使用场景是 I/O 重要、还是低延迟比较重要。
用Kafka
内建的kafka-run-class kafka.tools.DumpLogSegments
指令可以查看log
档案内容,—file
为必带、用逗号隔开可查询多个档案,指令是精华後面会再详细介绍。
这边主要是先让大家看一下,讯息的 metadata
有哪些,一般需要知道就是上面提到的索引键、值、时间戳和偏移量。
可以看到 Kafka
是会先讯息累积到 batch
中,等累积达一定量再送出,先以这一笔来看,就是累积了三笔 ( count: 3 ) 一次送出。
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771619770 size: 98 magic: 2 compresscodec: NONE crc: 16374966 isvalid: true
以下是主题 topic_for_test_lot
三个分区的 log
档案内容
但是这边看不到讯息实际的内容,只有一个个包起来的批次
$ kafka-run-class kafka.tools.DumpLogSegments --files broker1/topic_for_test_log-0/00000000000000000000.log,broker1/topic_for_test_log-1/00000000000000000000.log,broker2/topic_for_test_log-2/00000000000000000000.log
Dumping broker1/topic_for_test_log-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771619770 size: 98 magic: 2 compresscodec: NONE crc: 16374966 isvalid: true
baseOffset: 3 lastOffset: 4 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 98 CreateTime: 1631771621294 size: 81 magic: 2 compresscodec: NONE crc: 487960023 isvalid: true
baseOffset: 5 lastOffset: 10 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 179 CreateTime: 1631771625952 size: 128 magic: 2 compresscodec: NONE crc: 3083079563 isvalid: true
baseOffset: 11 lastOffset: 16 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 307 CreateTime: 1631771634662 size: 124 magic: 2 compresscodec: NONE crc: 1152822458 isvalid: true
Dumping broker1/topic_for_test_log-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771626645 size: 95 magic: 2 compresscodec: NONE crc: 1577038537 isvalid: true
baseOffset: 3 lastOffset: 9 count: 7 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 95 CreateTime: 1631771633592 size: 135 magic: 2 compresscodec: NONE crc: 372605860 isvalid: true
baseOffset: 10 lastOffset: 15 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 230 CreateTime: 1631771635665 size: 122 magic: 2 compresscodec: NONE crc: 2359930090 isvalid: true
Dumping broker2/topic_for_test_log-2/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 5 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771620918 size: 124 magic: 2 compresscodec: NONE crc: 2581760372 isvalid: true
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 124 CreateTime: 1631771627429 size: 69 magic: 2 compresscodec: NONE crc: 3572829527 isvalid: true
baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 193 CreateTime: 1631771635834 size: 69 magic: 2 compresscodec: NONE crc: 1038836103 isvalid: true
如果想要看到每个 batch
中的每一笔讯息内容(payload
栏位),还需要加上参数--print-data-log
,这样就可以看到真实的记录了
$ kafka-run-class kafka.tools.DumpLogSegments --files broker1/topic_for_test_log-0/00000000000000000000.log --print-data-log
Dumping broker1/topic_for_test_log-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1631771619770 size: 98 magic: 2 compresscodec: NONE crc: 16374966 isvalid: true
| offset: 0 isValid: true crc: null keySize: -1 valueSize: 7 CreateTime: 1631771618877 baseOffset: 0 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 98 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: asdf as
| offset: 1 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1631771619471 baseOffset: 0 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 98 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: sdf
| offset: 2 isValid: true crc: null keySize: -1 valueSize: 4 CreateTime: 1631771619770 baseOffset: 0 lastOffset: 2 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 98 magic: 2 compressType: NONE position: 0 sequence: -1 headerKeys: [] payload: asdf
baseOffset: 3 lastOffset: 4 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 98 CreateTime: 1631771621294 size: 81 magic: 2 compresscodec: NONE crc: 487960023 isvalid: true
| offset: 3 isValid: true crc: null keySize: -1 valueSize: 2 CreateTime: 1631771621106 baseOffset: 3 lastOffset: 4 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 81 magic: 2 compressType: NONE position: 98 sequence: -1 headerKeys: [] payload: as
| offset: 4 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1631771621294 baseOffset: 3 lastOffset: 4 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 81 magic: 2 compressType: NONE position: 98 sequence: -1 headerKeys: [] payload: dfa
...
...
baseOffset: 11 lastOffset: 16 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 307 CreateTime: 1631771634662 size: 124 magic: 2 compresscodec: NONE crc: 1152822458 isvalid: true
| offset: 11 isValid: true crc: null keySize: -1 valueSize: 4 CreateTime: 1631771633791 baseOffset: 11 lastOffset: 16 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 124 magic: 2 compressType: NONE position: 307 sequence: -1 headerKeys: [] payload: asdf
...
...
| offset: 15 isValid: true crc: null keySize: -1 valueSize: 2 CreateTime: 1631771634475 baseOffset: 11 lastOffset: 16 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 124 magic: 2 compressType: NONE position: 307 sequence: -1 headerKeys: [] payload: fa
| offset: 16 isValid: true crc: null keySize: -1 valueSize: 3 CreateTime: 1631771634662 baseOffset: 11 lastOffset: 16 baseSequence: -1 lastSequence: -1 producerEpoch: -1 partitionLeaderEpoch: 0 batchSize: 124 magic: 2 compressType: NONE position: 307 sequence: -1 headerKeys: [] payload: dsf
纪录是看到了,那到底储存在最深处的元资料代表了什麽,以下一个一个参数为大家初步介绍,後续篇章都会有近一步实际范例去使用、影响这些参数,目前只需要有个印象就足够了。
baseOffset: 0
: 该批次的起始偏移量lastOffset: 2
: 该批次结束的偏移量count: 3
: 该批次的纪录有几笔baseSequence: -1
: 需要设定消费者设定 idempotent
参数为真才会生效,否则为 -1lastSequence: -1
: 需要设定消费者设定 idempotent
参数为真才会生效,否则为 -1producerId: -1
: 生产者的编号,这边因为使用内建的shell
去新增资料所以为 -1producerEpoch: -1
: 生产者的 Epoch 编号,这边因为使用内建的shell
去新增资料所以为 -1partitionLeaderEpoch: 0
: 该笔资料的 partition leader
是谁isTransactional: false
: 是否为事务isControl: false
control batch
会有一笔 control record
是用来让消费者判断这个批次是否为失败的事务CreateTime: 1631771619770
: Partition Leader
收到讯息的时间点size: 98
: 该批次的大小magic: 2
=> 代表讯息的版本是 V2,Apache Kafka 在版本0.8之前支援 old consumer
和 old producer
也就是 V1版本,而现行版本都是使用 V2。compresscodec: NONE
NONE
,建议是都要开启crc: 16374966
baseOffset: int64
batchLength: int32
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: int32
attributes: int16
bit 0~2:
0: no compression
1: gzip
2: snappy
3: lz4
4: zstd
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6~15: unused
lastOffsetDelta: int32
firstTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records: [Record]
CRC
验证的范围是从attributes
一直到这个batch
的尾端,CRC 位置是在 magic
之後,这是因为必须先解析magic
类型才能决定要怎麽解析batch length
到 magic
之间的 byte
,而 partition leader epoch
也不包含在 CRC
验证的区间,这是为了避免这个栏位值在被 broker
重新分配时 CRC
必须要重新计算,这里 CRC
是使用 CRC-32C
来计算。
isvalid: true
这个参数是 Kafka
2.4.0版本新增的,主要是为了更清楚地显示错误的发生原因,像是版本 ( magic
) 对应错误、CRC
的 checksum
错误...等,会藉由 future
物件 RecordMetadata
将详细的错误原因回传。
资料来源:
<<: Day-3: Rails的Route + MVC架构
▲ AR12-TUF采用导热管与CPU直接接触的热导管直触技术(HDC),4根Ø 6mm全铜热导管...
今天要介绍的 Tip 是有关於 pipe 的 pure 与 impure,当没有任何额外的设定下,自...
前情提要 一般在用 Xcode 创新专案的时候,会预设使用 Main.storyboard 来作为我...
前面的文章都是在介绍Pipeline介面、范本内容,这一篇终於要真正进入正题,将Repo中的Cons...
继承昨日的漫谈,当一个专案已经开始拆解步骤和预期的交付产出,同时专案经理就会心里琢磨着「这件工作要找...