卡夫卡的藏书阁【Book19】- Kafka - KafkaJS 消费者 1

“I usually solve problems by letting them devour me.”
― Franz Kafka
打不赢就加入他


消费者

消费者可以利用多台机器和进程来消费很多主题、进行负载平衡,视为一个消费者群组,一但当有消费者失败的时候,会自动分配给这个消费者群组中的其他成员,对於代理 ( broker ) 来说,每个消费者必须拥有一个在消费者群组中唯一的ID

创建一个消费者
const consumer = kafka.consumer({ groupId: 'my-group' })
让消费者订阅一些主题
await consumer.connect()

await consumer.subscribe({ topic: 'topic-A' })

// 可以多次呼叫订阅方法
await consumer.subscribe({ topic: 'topic-B' })
await consumer.subscribe({ topic: 'topic-C' })

// 可以设定从第一则讯息开始读取
await consumer.subscribe({ topic: 'topic-D', fromBeginning: true })
当然,你也可以利用 RegExp 去订阅多个同系列的主题

Alternatively, you can subscribe to multiple topics at once using a RegExp:

await consumer.connect()
await consumer.subscribe({ topic: /topic-(eu|us)-.*/i })

值得注意的是,消费者不会追踪订阅主题是否建立,所以如果今天有一个代理拥有主题 topic-1topic-2,但是你利用 RegExp 下了条件 /topic-.*/ 去订阅主题後,主题 topic-3 在那之後创建出来,消费者并不会自动去新增订阅 topic-3


KafkaJS 提供了两种方式去处理消费的讯息

eachMessage

首先,eachmessage 提供一个最方便和简单的方式去使用消费者 API,它每次只读取一个讯息,他在实作在 eachBatch 之上,并且会依据你的设定自动的去提交 ( commmit )你的偏移量 ( offsets )、心跳监测 ( heartbeat),如果你的需求是单纯的,那使用 eachMessage 是个好选择

await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        console.log({
            key: message.key.toString(),
            value: message.value.toString(),
            headers: message.headers,
        })
    },
})

eachBatch

有一些使用上的需求需要直接以批次去处理讯息,eachBatch 提供了一些功能让你可以更方的的去实作,像是
resolveOffsetheartbeatcommitOffsetsIfNecessaryuncommittedOffsetsisRunningisStale,基本上所有处理完成的讯息,eachBatch 会自动去提供偏移量

注记:eachBatch 是更进阶的使用方式,因此你必须了解 session timeouts 和 heartbeats 是怎麽设定的

await consumer.run({
    eachBatchAutoResolve: true,
    eachBatch: async ({
        batch,
        resolveOffset,
        heartbeat,
        commitOffsetsIfNecessary,
        uncommittedOffsets,
        isRunning,
        isStale,
    }) => {
        for (let message of batch.messages) {
            console.log({
                topic: batch.topic,
                partition: batch.partition,
                highWatermark: batch.highWatermark,
                message: {
                    offset: message.offset,
                    key: message.key.toString(),
                    value: message.value.toString(),
                    headers: message.headers,
                }
            })

            resolveOffset(message.offset)
            await heartbeat()
        }
    },
})
  • eachBatchAutoResolve
    • 在每个批次完成後自动提供偏移量,如果设定为开启,KafkaJS会自动提教批次处理的最後一个偏移量
    • 预设值: true
  • batch.highWatermark
    • 该主题分区最一个被提交讯息的偏移量,主要是方便计算延迟
  • resolveOffset()
    • 用来标记批次中的讯息是否已经处理,如果出现错误,消费者会自动提交这些已处理讯息的偏移量
  • heartbeat(): Promise
    • 根据 heartbeatInterval 的设定值去跟代理传送心跳
  • commitOffsetsIfNecessary(offsets?): Promise
    • 根据 autoCommitIntervalautoCommitThreshold 的设定值,去提交偏移量,要注意的是,如果这个参数未被使用,那eachBatch 就不会自动提交偏移量
  • uncommittedOffsets()
    • 回传所有尚未提交的主题分区的偏移量
  • isRunning()
    • 回传消费者的运行状态,运行中回传 true,反之回传 false
  • isStale()
    • 回传批次处理的讯息是否已经过时、应该要舍弃掉

范例:

consumer.run({
    eachBatchAutoResolve: false,
    eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
        for (let message of batch.messages) {
            if (!isRunning() || isStale()) break
            await processMessage(message)
            resolveOffset(message.offset)
            await heartbeat()
        }
    }
})

上面这则例子,如果消费者在中途需要关闭,该批次剩余的讯息不会被消费、也不会提交偏移量,因此,你可以安心的关掉消费者,而不用担心会遗失讯息,当然如果消费者批次处理讯息时,遇到其他原因而中断,也不会去处理剩余的讯息。


<<:  【DAY 19】数据分析没有这麽难,透过 Microsoft Power BI ,让你事半功倍!(范例说明)

>>:  [DAY18] Use Case 设计概念

系统弱点扫描工具-Tenable Nessus(下)

今天来跟大家介绍弱点扫描的基本操作 启动我们的Nessus後 请记得先到Settings确认目前的P...

建立第一个RESTful api server(番外篇)-postman使用(Day13)

在实作RESTful api时,会需要模拟实际用户使用你的api的情境,这时候postman就派得上...

30天轻松学会unity自制游戏-往前移动

用最简单的方式Ctrl+C&Ctrl+V把场景往上延伸,Ctrl+D也可以直接复制此物件,看...

[Angular] Day5. Lifecycle hooks

在 Angular 的 Component 中有一个生命周期,当 Angular 实例化这个 Com...

就决定是你了 - 阵列系列I

图片来源:tooto1985/js-array-operations 内心剧场之胡言乱语 万能又好...