“I usually solve problems by letting them devour me.”
― Franz Kafka
打不赢就加入他
消费者可以利用多台机器和进程来消费很多主题、进行负载平衡,视为一个消费者群组,一但当有消费者失败的时候,会自动分配给这个消费者群组中的其他成员,对於代理 ( broker ) 来说,每个消费者必须拥有一个在消费者群组中唯一的ID
const consumer = kafka.consumer({ groupId: 'my-group' })
await consumer.connect()
await consumer.subscribe({ topic: 'topic-A' })
// 可以多次呼叫订阅方法
await consumer.subscribe({ topic: 'topic-B' })
await consumer.subscribe({ topic: 'topic-C' })
// 可以设定从第一则讯息开始读取
await consumer.subscribe({ topic: 'topic-D', fromBeginning: true })
RegExp
去订阅多个同系列的主题Alternatively, you can subscribe to multiple topics at once using a RegExp:
await consumer.connect()
await consumer.subscribe({ topic: /topic-(eu|us)-.*/i })
值得注意的是,消费者不会追踪订阅主题是否建立,所以如果今天有一个代理拥有主题 topic-1
和 topic-2
,但是你利用 RegExp
下了条件 /topic-.*/
去订阅主题後,主题 topic-3
在那之後创建出来,消费者并不会自动去新增订阅 topic-3
KafkaJS 提供了两种方式去处理消费的讯息
首先,eachmessage
提供一个最方便和简单的方式去使用消费者 API,它每次只读取一个讯息,他在实作在 eachBatch
之上,并且会依据你的设定自动的去提交 ( commmit )你的偏移量 ( offsets )、心跳监测 ( heartbeat),如果你的需求是单纯的,那使用 eachMessage
是个好选择
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
})
},
})
有一些使用上的需求需要直接以批次去处理讯息,eachBatch
提供了一些功能让你可以更方的的去实作,像是
resolveOffset
、heartbeat
、commitOffsetsIfNecessary
、uncommittedOffsets
、 isRunning
、isStale
,基本上所有处理完成的讯息,eachBatch
会自动去提供偏移量
注记:eachBatch
是更进阶的使用方式,因此你必须了解 session timeouts 和 heartbeats 是怎麽设定的
await consumer.run({
eachBatchAutoResolve: true,
eachBatch: async ({
batch,
resolveOffset,
heartbeat,
commitOffsetsIfNecessary,
uncommittedOffsets,
isRunning,
isStale,
}) => {
for (let message of batch.messages) {
console.log({
topic: batch.topic,
partition: batch.partition,
highWatermark: batch.highWatermark,
message: {
offset: message.offset,
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
}
})
resolveOffset(message.offset)
await heartbeat()
}
},
})
KafkaJS
会自动提教批次处理的最後一个偏移量heartbeatInterval
的设定值去跟代理传送心跳autoCommitInterval
和 autoCommitThreshold
的设定值,去提交偏移量,要注意的是,如果这个参数未被使用,那eachBatch
就不会自动提交偏移量范例:
consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
for (let message of batch.messages) {
if (!isRunning() || isStale()) break
await processMessage(message)
resolveOffset(message.offset)
await heartbeat()
}
}
})
上面这则例子,如果消费者在中途需要关闭,该批次剩余的讯息不会被消费、也不会提交偏移量,因此,你可以安心的关掉消费者,而不用担心会遗失讯息,当然如果消费者批次处理讯息时,遇到其他原因而中断,也不会去处理剩余的讯息。
<<: 【DAY 19】数据分析没有这麽难,透过 Microsoft Power BI ,让你事半功倍!(范例说明)
今天来跟大家介绍弱点扫描的基本操作 启动我们的Nessus後 请记得先到Settings确认目前的P...
在实作RESTful api时,会需要模拟实际用户使用你的api的情境,这时候postman就派得上...
用最简单的方式Ctrl+C&Ctrl+V把场景往上延伸,Ctrl+D也可以直接复制此物件,看...
在 Angular 的 Component 中有一个生命周期,当 Angular 实例化这个 Com...
图片来源:tooto1985/js-array-operations 内心剧场之胡言乱语 万能又好...