卡夫卡的藏书阁【Book22】- Kafka - KafkaJS 消费者 4

“Love is, that you are the knife which I plunge into myself.”
― Kafka, Franzv


重新设定偏移量 ( Seek )

消费者可以重新设定对於某个主题/分区的偏移量,只要使用方法 seek,这个方法可以在消费者被初始化且开始运作後被呼叫

await consumer.connect()
await consumer.subscribe({ topic: 'example' })

consumer.run({ eachMessage: async ({ topic, message }) => true })
consumer.seek({ topic: 'example', partition: 0, offset: 12384 })

如果在生效的批次中重新设定偏移量,会将该批次的讯息都标注为陈旧讯息并且弃用,这是为了去确保下一个消费的讯息是从重新设定的偏移量开始消费,所以在使用 eachBatch 方法时,请记得要用 isStale() 检查讯息是否是陈旧讯息
预设的情况下生产者会自动提交重新设定的偏移量,将生产者的自动提交(autoCommit)参数关闭可以避免这个行为

consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, message }) => true
})

consumer.seek({ topic: 'example', partition: 0, offset: "12384" })

客制化分区器 (Custom partition assigner)

KafkaJS 预设是按照顺序( round robin )去分配分区的,但其实可以自己客制化分区的逻辑给消费者群组使用
一个分区器是一个会回传带有介面的物件的方法,以下为范例

const MyPartitionAssigner = ({ cluster }) => ({
    name: 'MyPartitionAssigner',
    version: 1,
    async assign({ members, topics }) {},
    protocol({ topics }) {}
})

这方法必须针对每个主题的分区去回传分派计画,分派计画是一个由 memberId 列表和 memberAssignment 组成的,memberAssignment 必须用 MemberAssignment 加密,范例如下

const { AssignerProtocol: { MemberAssignment } } = require('kafkajs')

const MyPartitionAssigner = ({ cluster }) => ({
    version: 1,
    async assign({ members, topics }) {
        // perform assignment
        return myCustomAssignmentArray.map(memberId => ({
            memberId,
            memberAssignment: MemberAssignment.encode({
                version: this.version,
                assignment: assignment[memberId],
            })
        }))
    }
})

方法 protocol 要回传 namemetadatametadata 必须要用 MemberMetadata 去加密,以下为范例

const { AssignerProtocol: { MemberMetadata } } = require('kafkajs')

const MyPartitionAssigner = ({ cluster }) => ({
    name: 'MyPartitionAssigner',
    version: 1,
    protocol({ topics }) {
        return {
            name: this.name,
            metadata: MemberMetadata.encode({
            version: this.version,
            topics,
            }),
        }
    }
})

assigner 完成後将它加到分配器的清单中,这边的重点是要记得要设定预设的分配器去让旧的消费者可以使用

const { PartitionAssigners: { roundRobin } } = require('kafkajs')

kafka.consumer({
    groupId: 'my-group',
    partitionAssigners: [
        MyPartitionAssigner,
        roundRobin
    ]
})

Describe group

这个参数是新加入、实验阶段的参数,未来可能会被移除或是修改掉,这个参数会回传消费者群组的元资料

const data = await consumer.describeGroup()
// {
//  errorCode: 0,
//  groupId: 'consumer-group-id-f104efb0e1044702e5f6',
//  members: [
//    {
//      clientHost: '/172.19.0.1',
//      clientId: 'test-3e93246fe1f4efa7380a',
//      memberAssignment: Buffer,
//      memberId: 'test-3e93246fe1f4efa7380a-ff87d06d-5c87-49b8-a1f1-c4f8e3ffe7eb',
//      memberMetadata: Buffer,
//    },
//  ],
//  protocol: 'RoundRobinAssigner',
//  protocolType: 'consumer',
//  state: 'Stable',
// },

<<:  Day 22 - [API] 使用 PHP 执行 Python 脚本

>>:  [Day21] Remote Code Execution

Day 30 没credit也要把隐私规划做好

系统软件开发出身,後来转为SA、PM到产品服务架构师,後来专心研究隐私服务相关的规划,如同Day1开...

简单的 HelloWorld ~

今天本来要照着顺序来聊聊开发环境介绍的,但太枯燥了先跳过吧XD 想说点简单的,顺便为下一篇内容提出几...

Day2-Start to go

简介 Go 是由 Google 开发的程序语言,於2007发起,在2009正式推,2012年发布第一...

【第十天 - Two-pointer 介绍】

Q1. Two-pointer 是什麽? 我个人认为双指标 ( Two-pointer ) 比较像写...

Day 02:软件业界常用软件

前言 为什麽先介绍软件 在开始进入工程师的世界之前,先来了解工程师平常使用的工具。 就能开始用这些工...