卡夫卡的藏书阁【Book21】- Kafka - KafkaJS 消费者 3

“Now I can look at you in peace; I don't eat you any more.”
― Franz Kafka
肠胃支配者


参数 fromBeginning


消费者群组会依据最後提交的偏移量,从那个偏移量开始读取讯息,如果偏移量是无效 ( invalid ) 或是未定义 ( not defined ),那就会看参数 fromBeginning 的设定值来决定消费者群组怎麽动作

await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.subscribe({ topic: 'other-topic', fromBeginning: false })

如果设定为 true 消费者会从最早的偏移量开始读取讯息,设定为 false 消费者会从最新的偏移量开始读取讯息

预设值是 false

参数选项:


kafka.consumer({
  groupId: <String>,
  partitionAssigners: <Array>,
  sessionTimeout: <Number>,
  rebalanceTimeout: <Number>,
  heartbeatInterval: <Number>,
  metadataMaxAge: <Number>,
  allowAutoTopicCreation: <Boolean>,
  maxBytesPerPartition: <Number>,
  minBytes: <Number>,
  maxBytes: <Number>,
  maxWaitTimeInMs: <Number>,
  retry: <Object>,
  maxInFlightRequests: <Number>,
  rackId: <String>
})
  • partitionAssigners
    • 分区器的清单
    • 预设值:[PartitionAssigners.roundRobin]
  • sessionTimeout
    • session 的 timeout 时间(毫秒),消费者会以固定周期传送心跳给代理 ( broker ),如果在 session timeout 时间超过前,代理都没有收到心跳,那代理会将这个消费者从消费者群组中移除,并且进行重新分配 ( rebalance )
    • 预设值:30000
  • rebalanceTimeout
    • 消费者协调器等待消费者重新加入消费者群组的最大等待时间(毫秒)
    • 预设值:60000
  • heartbeatInterval
    • 传送心跳给消费者协调气的时间间隔,心跳是用来确保消费者的session还活着,这个数值必须设定的比 session timeout 小
    • 预设值:3000
  • metadataMaxAge
    • 多久强迫更新元资料(metadata)一次,即使没有发生 partition leadership 的改变
    • 预设值:300000 (5 minutes)
  • allowAutoTopicCreation
    • 是否允许自动建立主题 ( topic ),如果消费者消费的主题尚未存在的话
    • 预设值:true
  • maxBytesPerPartition
    • 每个分区最大可以回传的讯息数量,这个大小至少必须跟单一个讯息的大小一样大,不然可能会发生生产者传送了大於消费者可以消费大小的讯息造成卡住
    • 预设值:1048576 (1MB)
  • minBytes
    • 每个消费要求最小的讯息回传量,如果没有达到这个最小讯息回传量,会等待 maxWaitTimeInMs 的设定时间去累积讯息量
    • 预设值:1
  • maxBytes
    • Kafka版本大於 0.10.1.0 支援这个参数,每次消费需求最大的讯息回传量
    • 预设值:10485760 (10MB)
  • maxWaitTimeInMs
    • 在资料数量满足 miniBytes 的最大等待资料搜寻的时间
    • 预设值:5000
  • retry
    • 重试机制,在生产者的章节有介绍过
    • 预设值:{ retries: 5 }
  • readUncommitted
    • 设定消费者的隔离程度,预设值设定为 false 消费者不会回传未提交偏移量的交易讯息
    • 预设值:false
  • maxInFlightRequests
    • 同时最多可以有几个消费请求,设定为 false 表示没有上限
    • 预设值:null (no limit)
  • rackId
    • 开启 rack 的 id
    • 预设值:null

暂停消费和重新开始消费 ( Pause & Resume )


消费者提供了暂停(pause)跟恢复(resume)两个方法,让消费者可以在消费一个或多个主题途中暂停或恢复消费,也有提供方法已暂停(paused)去取得目前暂停消费的主题清单
需要注意的是暂停消费一个主题,会是在下一个周期才生效,消费者还是可以读取当前这个批次的讯息

暂停一个消费者没有订阅的主题是不会动作的,恢复消费一个没有被暂停的主题也是不会动作的
另外,当消费者未启动时去暂停或是恢复消费都会喷错

范例,情境是消费者使用的外部依赖系统超过负载,我们可以暂停消费主题,等待一段时间後重新恢复消费主题

await consumer.connect()
await consumer.subscribe({ topic: 'jobs' })

await consumer.run({ eachMessage: async ({ topic, message }) => {
    try {
        await sendToDependency(message)
    } catch (e) {
        if (e instanceof TooManyRequestsError) {
            consumer.pause([{ topic }])
            setTimeout(() => consumer.resume([{ topic }]), e.retryAfter * 1000)
        }

        throw e
    }
}})

这部分是可以做到更细微地控制的,可以暂停主题中的特定分区,而非只能暂停整个主题

举例来说,这样可以避免中断整个消费,只因为其中一个分区过於缓慢

consumer.run({
    partitionsConsumedConcurrently: 3, // Default: 1
    eachMessage: async ({ topic, partition, message }) => {
      // This will be called up to 3 times concurrently
        try {
            await sendToDependency(message)
        } catch (e) {
            if (e instanceof TooManyRequestsError) {
                consumer.pause([{ topic, partitions: [partition] }])
                // Other partitions will keep fetching and processing, until if / when
                // they also get throttled
                setTimeout(() => {
                    consumer.resume([{ topic, partitions: [partition] }])
                    // Other partitions that are paused will continue to be paused
                }, e.retryAfter * 1000)
            }

            throw e
        }
    },
})

可以呼叫 paused 方法去取得暂停的主题清单

const pausedTopicPartitions = consumer.paused()

for (const topicPartitions of pausedTopicPartitions) {
  const { topic, partitions } = topicPartitions
  console.log({ topic, partitions })
}

<<:  Day21 样式变化(动画)5

>>:  Day 21 - WooCommerce: 信用卡付款设定选项 (上)

Day 1 : 前言+本系列会使用到的东西(vscode、xampp、virtualbox、ubuntu、python安装说明)

前言: 大家好,这是我第一次参加铁人赛 主要是想记录一下自己学过的东西 并和大家分享一些我觉得很重要...

Day 16:CI / CD

前言 DevOps 是一种理念,目的是让开发到发布的速度、稳定性都能提升。 而 CI/CD 是实践 ...

【Day 27】Hook 07:useMemo

useMemo 用於性能优化,避免重复执行高效能的渲染 如果传入的参数未改变,就直接沿用上次的计算结...

[Day N] - 出书玩真的!出版罗~《IoT没那麽难!新手用 JavaScript 入门做自己的玩具!》

大家好,我是17King~ d(`・∀・)b 跟大家报告一个好消息! 我的书终於出版啦!!! (拍手...

欢迎进入 ip 的世界,Ruby 30 天刷题修行篇第十五话

嗨,我是 A Fei,来看看今天的题目: (题目来源:Codewars) Take the foll...