卡夫卡的藏书阁【Book20】- Kafka - KafkaJS 消费者 2

“I am in chains. Don't touch my chains.”
― Kafka, Franz
断开锁链、断开魂结


并发式的消费分区


预设的情况下 eachMessage 是按照顺序去消费分区 ( partition )中指定主题 ( topic ) 的讯息,如果想要一次同时并发取得多个讯息,你可以调高参数 partitionsConsumedConcurrently 的设定值

consumer.run({
    partitionsConsumedConcurrently: 3, // Default: 1
    eachMessage: async ({ topic, partition, message }) => {
        // This will be called up to 3 times concurrently
    },
})

在同一个分区的讯息是保证其顺序性的,但是其实是可以同时处理来自不同分区的多个讯息,如果 eachMessage 是包含有异步的工作内容,像是发起网路请求、硬碟I/O...等,那使用这个参数可以提高效能,但是如果 eachMessage 完成是同步的,那这个设定并不会有显着的影响

这个参数也可以使用在 eachBatch,在 eachBatch 加上参数 partitionsConsumedConcurrently,就可以一次并发处理多个批次的讯息

这边会建议设定参数 partitionsConsumedConcurrently 的数值不要超过你消费的分区数量,另外一个影响效能的地方是 CPU 的上限,建议不要设定超过你 CPU 颗数,如果是一开始使用就从比较低的数值开始设定测试、观察是否有效提高吞吐量

自动提交 ( autoCommit )


从 Kafka 取得讯息一定都是批次去取得讯息的,就算你使用 eachMessage 也是,而当整个批次都被消费处理完後,最後一个讯息的偏移量会被自动提交给 Kafka

周期性地去提交偏移量可以让消费者在从异常状况复原更容易,像是消费组的重新平衡、去除陈旧资料,但是过於频繁的提交偏移量会造成网路流量负担、降低处理速度,因此,自动提交提供了很有弹性的提交偏移量设定,有两种方式可供选择

首先,autoCommitInterval 消费者会以固定时间周期去提交偏移量,举例来说,每五秒提交一次,autoCommitInterval 的预设值是 null,单位毫秒 ( milliseconds )

consumer.run({
  autoCommitInterval: 5000,
  // ...
})

第二种方式是 autoCommitThreshold 消费者会在处理完固定数量的讯息後去提交偏移量,举例来说,每一百笔讯息提交一次,该参数的预设值是 null

consumer.run({
  autoCommitThreshold: 100,
  // ...
})

你可以同时使用这两个参数,是可行的,消费者会在满足其中一个条件 ( 时间或是数量 ) 时就提交偏移量

手动提交 ( Manual committing )


如果你将自动提交偏移量关闭 ( autoCommit ) 你还是可以进行手动提交,提交的方式有以下几种

  • 使用方法 commitOffsetsIfNecessaryeachBatch 的 callback,这参数仍然会使用 autoCommit 设定参数
  • 在交易 ( transaction ) 中提交偏移量
  • 使用消费者的 commitOffsets 方法来提交

生产者的 commitOffsets 方法是最底层的提交方式,它会忽略所有自动提交的相关设定值,使用这个方法可以自由选择要提交的偏移量、一次性的提交多个偏移量,这个在某些状况很有用处,像是制作重置工具,在 consumer.run 後将自动提交关闭,手动去提交偏移量,这边的偏移量是用来决定消费者要从分区的哪个位置开始消费讯息

consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, partition, message }) => {
        // Process the message somehow
    },
})

consumer.commitOffsets([
  { topic: 'topic-A', partition: 0, offset: '1' },
  { topic: 'topic-A', partition: 1, offset: '3' },
  { topic: 'topic-B', partition: 0, offset: '2' }
])

值得注意的是你不一定要将消费者的偏移量存在 Kafka 之中,你可以选择将偏移量存在你想到的机器,像是 SQL 资料库,可以让消费者的设定 exactly once 更加有保障

存在 Kafka 之外偏移量的状妄

  • 在自动提交关闭的情况下使用消费者
  • 将讯息的偏移量+1後存起来,为了避免重复消费的问题
  • 重启後用 seek 去找到消费者

<<:  Day 20 - [语料库模型] 08-绘制语料库模型Heatmap图

>>:  Day29:复习 Channel、Flow

Day 19 你有设定「排除关键字」吗?

当然所有的广告推播,跟我们的众多设定或是目标都有关系,但是 Google 也不一定每次都能判断成功,...

Day7 开始使用Git

大家好我是乌木白,今天来和大家介绍如何使用Git,今天的内容会稍微多一点,请大家要多复习喔!! 全...

[第20天]理财达人Mx. Ada-Telegram Bot-echo测试

前言 本文说明使用Python建立Telegram Bot-echo测试 。 程序实作 from t...

[Day15] 优先性与相依性

当运算式有多个或多个类别的运算子时,我们会以运算子的 优先性 以及 相依性 来决定运算执行的顺序与方...