“Love is, that you are the knife which I plunge into myself.”
― Kafka, Franzv
消费者可以重新设定对於某个主题/分区的偏移量,只要使用方法 seek
,这个方法可以在消费者被初始化且开始运作後被呼叫
await consumer.connect()
await consumer.subscribe({ topic: 'example' })
consumer.run({ eachMessage: async ({ topic, message }) => true })
consumer.seek({ topic: 'example', partition: 0, offset: 12384 })
如果在生效的批次中重新设定偏移量,会将该批次的讯息都标注为陈旧讯息并且弃用,这是为了去确保下一个消费的讯息是从重新设定的偏移量开始消费,所以在使用 eachBatch 方法时,请记得要用 isStale() 检查讯息是否是陈旧讯息
预设的情况下生产者会自动提交重新设定的偏移量,将生产者的自动提交(autoCommit)参数关闭可以避免这个行为
consumer.run({
autoCommit: false,
eachMessage: async ({ topic, message }) => true
})
consumer.seek({ topic: 'example', partition: 0, offset: "12384" })
KafkaJS 预设是按照顺序( round robin )去分配分区的,但其实可以自己客制化分区的逻辑给消费者群组使用
一个分区器是一个会回传带有介面的物件的方法,以下为范例
const MyPartitionAssigner = ({ cluster }) => ({
name: 'MyPartitionAssigner',
version: 1,
async assign({ members, topics }) {},
protocol({ topics }) {}
})
这方法必须针对每个主题的分区去回传分派计画,分派计画是一个由 memberId
列表和 memberAssignment
组成的,memberAssignment
必须用 MemberAssignment
加密,范例如下
const { AssignerProtocol: { MemberAssignment } } = require('kafkajs')
const MyPartitionAssigner = ({ cluster }) => ({
version: 1,
async assign({ members, topics }) {
// perform assignment
return myCustomAssignmentArray.map(memberId => ({
memberId,
memberAssignment: MemberAssignment.encode({
version: this.version,
assignment: assignment[memberId],
})
}))
}
})
方法 protocol
要回传 name
和 metadata
,metadata
必须要用 MemberMetadata
去加密,以下为范例
const { AssignerProtocol: { MemberMetadata } } = require('kafkajs')
const MyPartitionAssigner = ({ cluster }) => ({
name: 'MyPartitionAssigner',
version: 1,
protocol({ topics }) {
return {
name: this.name,
metadata: MemberMetadata.encode({
version: this.version,
topics,
}),
}
}
})
当 assigner
完成後将它加到分配器的清单中,这边的重点是要记得要设定预设的分配器去让旧的消费者可以使用
const { PartitionAssigners: { roundRobin } } = require('kafkajs')
kafka.consumer({
groupId: 'my-group',
partitionAssigners: [
MyPartitionAssigner,
roundRobin
]
})
这个参数是新加入、实验阶段的参数,未来可能会被移除或是修改掉,这个参数会回传消费者群组的元资料
const data = await consumer.describeGroup()
// {
// errorCode: 0,
// groupId: 'consumer-group-id-f104efb0e1044702e5f6',
// members: [
// {
// clientHost: '/172.19.0.1',
// clientId: 'test-3e93246fe1f4efa7380a',
// memberAssignment: Buffer,
// memberId: 'test-3e93246fe1f4efa7380a-ff87d06d-5c87-49b8-a1f1-c4f8e3ffe7eb',
// memberMetadata: Buffer,
// },
// ],
// protocol: 'RoundRobinAssigner',
// protocolType: 'consumer',
// state: 'Stable',
// },
<<: Day 22 - [API] 使用 PHP 执行 Python 脚本
>>: [Day21] Remote Code Execution
系统软件开发出身,後来转为SA、PM到产品服务架构师,後来专心研究隐私服务相关的规划,如同Day1开...
今天本来要照着顺序来聊聊开发环境介绍的,但太枯燥了先跳过吧XD 想说点简单的,顺便为下一篇内容提出几...
简介 Go 是由 Google 开发的程序语言,於2007发起,在2009正式推,2012年发布第一...
Q1. Two-pointer 是什麽? 我个人认为双指标 ( Two-pointer ) 比较像写...
前言 为什麽先介绍软件 在开始进入工程师的世界之前,先来了解工程师平常使用的工具。 就能开始用这些工...