卡夫卡的藏书阁【Book25】- Kafka - KafkaJS Admin 2

“As Gregor Samsa awoke one morning from uneasy dreams he found himself transformed in > his bed into a gigantic insect.”
― Franz Kafka, The Metamorphosis


取得主题的元资料

await admin.fetchTopicMetadata({ topics: <Array<String>> })

TopicsMetadata 的结构

{
    topics: <Array<TopicMetadata>>,
}

TopicMetadata 的结构

{
    topic: <String>,
    partitions: <Array<PartitionMetadata>> // 预设值为 1
}

PartitionMetadata 的结构

{
    partitionErrorCode: <Number>, // 预设值为 0
    partitionId: <Number>,
    leader: <Number>,
    replicas: <Array<Number>>,
    isr: <Array<Number>>,
}

如果主题清单中的任一个主题不存在的话 admin 会喷错,如果你忽略不带参数 topics 会预设读取所有的主题

await admin.fetchTopicMetadata()

取得主题的偏移量资讯

admin 的方法 fetchTopicOffsets 回传最新一笔讯息的偏移量、高水位和低水位

await admin.fetchTopicOffsets(topic)
// [
//   { partition: 0, offset: '31004', high: '31004', low: '421' },
//   { partition: 1, offset: '54312', high: '54312', low: '3102' },
//   { partition: 2, offset: '32103', high: '32103', low: '518' },
//   { partition: 3, offset: '28', high: '28', low: '0' },
// ]

依据时间来取得偏移量

指定一个时间点,取得那个时间点最早一笔讯息的偏移量

await admin.fetchTopicOffsetsByTimestamp(topic, timestamp)
// [
//   { partition: 0, offset: '3244' },
//   { partition: 1, offset: '3113' },
// ]

取得消费者群组的偏移量

fetchOffsets 会回传给定主题清单中的消费者群组的偏移量

await admin.fetchOffsets({ groupId, topics: ['topic1', 'topic2'] })
// [
//   {
//     topic: 'topic1',
//     partitions: [
//       { partition: 0, offset: '31004' },
//       { partition: 1, offset: '54312' },
//       { partition: 2, offset: '32103' },
//       { partition: 3, offset: '28' },
//     ],
//   },
//   {
//     topic: 'topic2',
//     partitions: [
//       { partition: 0, offset: '1234' },
//       { partition: 1, offset: '4567' },
//     ],
//   },
// ]

如果你想取得消费者群组所有有提交偏移量的主题,可以直接忽略不带参数 topics
使用可选参数 resolveOffsets 可以在不启动消费者的情况下取得该分区主题的偏移量,通常在呼叫方法 resetOffets (https://kafka.js.org/docs/next/admin#a-name-reset-offsets-a-reset-consumer-group-offsets) 之後使用

await admin.resetOffsets({ groupId, topic })
await admin.fetchOffsets({ groupId, topic, resolveOffsets: false })
// [
//   { partition: 0, offset: '-1' },
//   { partition: 1, offset: '-1' },
//   { partition: 2, offset: '-1' },
//   { partition: 3, offset: '-1' },
// ]

await admin.resetOffsets({ groupId, topic })
await admin.fetchOffsets({ groupId, topic, resolveOffsets: true })
// [
//   { partition: 0, offset: '31004' },
//   { partition: 1, offset: '54312' },
//   { partition: 2, offset: '32103' },
//   { partition: 3, offset: '28' },
// ]

重置消费者群组的偏移量

将消费者群组的偏移量重置为最早的偏移量或是最近一笔的偏移量 ( 预设值为最近一笔的偏移量 ),重置时必须确保消费者群组没有在运作中,否则此动作无法进行

await admin.resetOffsets({ groupId, topic }) // latest by default
// await admin.resetOffsets({ groupId, topic, earliest: true })

设定消费者群组的偏移量

admin 的方法 setOffsets 可以让设定任意数值的偏移量给消费者

await admin.setOffsets({
    groupId: <String>,
    topic: <String>,
    partitions: <SeekEntry[]>,
})

SeekEntry 的结构

{
    partition: <Number>,
    offset: <String>,
}

范例

await admin.setOffsets({
    groupId: 'my-consumer-group',
    topic: 'custom-topic',
    partitions: [
        { partition: 0, offset: '35' },
        { partition: 3, offset: '19' },
    ]
})

依照时间点去重置消费者群组的偏移量

合并使用方法 fetchTopicOffsetsByTimestamp 和 setOffsets 可以取得那个时间点最早一笔讯息的偏移量并设定消费者偏移量,重置时必须确保消费者群组没有在运作中,否则此动作无法进行

await admin.setOffsets({ groupId, topic, partitions: await admin.fetchTopicOffsetsByTimestamp(topic, timestamp) })

<<:  Day 26 Serverless的运算服务-AWS Lambda

>>:  Day 24 bert 文字情感分类-3

第2车厢-砸重金才懂得:HTML标签

本篇重点在於告诉新手们,必懂得HTML标签、观念,并且推荐一些文章,让各位可以往此方向去了解 想当...

失败了 还是可以进行更新

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 2...

Day30 -- Countdown Clock

目标 今天要来做的是倒数计时器 Step1 let countdown; function time...

Day 28【Deploy NFT - Deploy the Lazy Mint in Website】Vitalik Buterin mining Ethereum

// Thats proof of work 【前言】 今天终於来到这个 Project 测试的最...

云端定义 2

本系列文章同步发布於笔者网站 昨天我们介绍了云端的五个必要条件,今天要接续昨天的云端定义,来介绍云端...