“I am in chains. Don't touch my chains.”
― Kafka, Franz
断开锁链、断开魂结
预设的情况下 eachMessage
是按照顺序去消费分区 ( partition )中指定主题 ( topic ) 的讯息,如果想要一次同时并发取得多个讯息,你可以调高参数 partitionsConsumedConcurrently
的设定值
consumer.run({
partitionsConsumedConcurrently: 3, // Default: 1
eachMessage: async ({ topic, partition, message }) => {
// This will be called up to 3 times concurrently
},
})
在同一个分区的讯息是保证其顺序性的,但是其实是可以同时处理来自不同分区的多个讯息,如果 eachMessage
是包含有异步的工作内容,像是发起网路请求、硬碟I/O...等,那使用这个参数可以提高效能,但是如果 eachMessage
完成是同步的,那这个设定并不会有显着的影响
这个参数也可以使用在 eachBatch
,在 eachBatch
加上参数 partitionsConsumedConcurrently
,就可以一次并发处理多个批次的讯息
这边会建议设定参数 partitionsConsumedConcurrently
的数值不要超过你消费的分区数量,另外一个影响效能的地方是 CPU 的上限,建议不要设定超过你 CPU 颗数,如果是一开始使用就从比较低的数值开始设定测试、观察是否有效提高吞吐量
从 Kafka 取得讯息一定都是批次去取得讯息的,就算你使用 eachMessage
也是,而当整个批次都被消费处理完後,最後一个讯息的偏移量会被自动提交给 Kafka
周期性地去提交偏移量可以让消费者在从异常状况复原更容易,像是消费组的重新平衡、去除陈旧资料,但是过於频繁的提交偏移量会造成网路流量负担、降低处理速度,因此,自动提交提供了很有弹性的提交偏移量设定,有两种方式可供选择
首先,autoCommitInterval
消费者会以固定时间周期去提交偏移量,举例来说,每五秒提交一次,autoCommitInterval
的预设值是 null,单位毫秒 ( milliseconds )
consumer.run({
autoCommitInterval: 5000,
// ...
})
第二种方式是 autoCommitThreshold
消费者会在处理完固定数量的讯息後去提交偏移量,举例来说,每一百笔讯息提交一次,该参数的预设值是 null
consumer.run({
autoCommitThreshold: 100,
// ...
})
你可以同时使用这两个参数,是可行的,消费者会在满足其中一个条件 ( 时间或是数量 ) 时就提交偏移量
如果你将自动提交偏移量关闭 ( autoCommit ) 你还是可以进行手动提交,提交的方式有以下几种
commitOffsetsIfNecessary
在 eachBatch
的 callback,这参数仍然会使用 autoCommit
设定参数commitOffsets
方法来提交生产者的 commitOffsets
方法是最底层的提交方式,它会忽略所有自动提交的相关设定值,使用这个方法可以自由选择要提交的偏移量、一次性的提交多个偏移量,这个在某些状况很有用处,像是制作重置工具,在 consumer.run
後将自动提交关闭,手动去提交偏移量,这边的偏移量是用来决定消费者要从分区的哪个位置开始消费讯息
consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
// Process the message somehow
},
})
consumer.commitOffsets([
{ topic: 'topic-A', partition: 0, offset: '1' },
{ topic: 'topic-A', partition: 1, offset: '3' },
{ topic: 'topic-B', partition: 0, offset: '2' }
])
值得注意的是你不一定要将消费者的偏移量存在 Kafka 之中,你可以选择将偏移量存在你想到的机器,像是 SQL 资料库,可以让消费者的设定 exactly once
更加有保障
存在 Kafka 之外偏移量的状妄
<<: Day 20 - [语料库模型] 08-绘制语料库模型Heatmap图
当然所有的广告推播,跟我们的众多设定或是目标都有关系,但是 Google 也不一定每次都能判断成功,...
大家好我是乌木白,今天来和大家介绍如何使用Git,今天的内容会稍微多一点,请大家要多复习喔!! 全...
前言 本文说明使用Python建立Telegram Bot-echo测试 。 程序实作 from t...
当运算式有多个或多个类别的运算子时,我们会以运算子的 优先性 以及 相依性 来决定运算执行的顺序与方...