“As Gregor Samsa awoke one morning from uneasy dreams he found himself transformed in > his bed into a gigantic insect.”
― Franz Kafka, The Metamorphosis
取得主题的元资料
await admin.fetchTopicMetadata({ topics: <Array<String>> })
TopicsMetadata 的结构
{
topics: <Array<TopicMetadata>>,
}
TopicMetadata 的结构
{
topic: <String>,
partitions: <Array<PartitionMetadata>> // 预设值为 1
}
PartitionMetadata 的结构
{
partitionErrorCode: <Number>, // 预设值为 0
partitionId: <Number>,
leader: <Number>,
replicas: <Array<Number>>,
isr: <Array<Number>>,
}
如果主题清单中的任一个主题不存在的话 admin 会喷错,如果你忽略不带参数 topics 会预设读取所有的主题
await admin.fetchTopicMetadata()
admin 的方法 fetchTopicOffsets 回传最新一笔讯息的偏移量、高水位和低水位
await admin.fetchTopicOffsets(topic)
// [
// { partition: 0, offset: '31004', high: '31004', low: '421' },
// { partition: 1, offset: '54312', high: '54312', low: '3102' },
// { partition: 2, offset: '32103', high: '32103', low: '518' },
// { partition: 3, offset: '28', high: '28', low: '0' },
// ]
指定一个时间点,取得那个时间点最早一笔讯息的偏移量
await admin.fetchTopicOffsetsByTimestamp(topic, timestamp)
// [
// { partition: 0, offset: '3244' },
// { partition: 1, offset: '3113' },
// ]
fetchOffsets 会回传给定主题清单中的消费者群组的偏移量
await admin.fetchOffsets({ groupId, topics: ['topic1', 'topic2'] })
// [
// {
// topic: 'topic1',
// partitions: [
// { partition: 0, offset: '31004' },
// { partition: 1, offset: '54312' },
// { partition: 2, offset: '32103' },
// { partition: 3, offset: '28' },
// ],
// },
// {
// topic: 'topic2',
// partitions: [
// { partition: 0, offset: '1234' },
// { partition: 1, offset: '4567' },
// ],
// },
// ]
如果你想取得消费者群组所有有提交偏移量的主题,可以直接忽略不带参数 topics
使用可选参数 resolveOffsets 可以在不启动消费者的情况下取得该分区主题的偏移量,通常在呼叫方法 resetOffets (https://kafka.js.org/docs/next/admin#a-name-reset-offsets-a-reset-consumer-group-offsets) 之後使用
await admin.resetOffsets({ groupId, topic })
await admin.fetchOffsets({ groupId, topic, resolveOffsets: false })
// [
// { partition: 0, offset: '-1' },
// { partition: 1, offset: '-1' },
// { partition: 2, offset: '-1' },
// { partition: 3, offset: '-1' },
// ]
await admin.resetOffsets({ groupId, topic })
await admin.fetchOffsets({ groupId, topic, resolveOffsets: true })
// [
// { partition: 0, offset: '31004' },
// { partition: 1, offset: '54312' },
// { partition: 2, offset: '32103' },
// { partition: 3, offset: '28' },
// ]
将消费者群组的偏移量重置为最早的偏移量或是最近一笔的偏移量 ( 预设值为最近一笔的偏移量 ),重置时必须确保消费者群组没有在运作中,否则此动作无法进行
await admin.resetOffsets({ groupId, topic }) // latest by default
// await admin.resetOffsets({ groupId, topic, earliest: true })
admin 的方法 setOffsets 可以让设定任意数值的偏移量给消费者
await admin.setOffsets({
groupId: <String>,
topic: <String>,
partitions: <SeekEntry[]>,
})
SeekEntry 的结构
{
partition: <Number>,
offset: <String>,
}
范例
await admin.setOffsets({
groupId: 'my-consumer-group',
topic: 'custom-topic',
partitions: [
{ partition: 0, offset: '35' },
{ partition: 3, offset: '19' },
]
})
合并使用方法 fetchTopicOffsetsByTimestamp 和 setOffsets 可以取得那个时间点最早一笔讯息的偏移量并设定消费者偏移量,重置时必须确保消费者群组没有在运作中,否则此动作无法进行
await admin.setOffsets({ groupId, topic, partitions: await admin.fetchTopicOffsetsByTimestamp(topic, timestamp) })
<<: Day 26 Serverless的运算服务-AWS Lambda
本篇重点在於告诉新手们,必懂得HTML标签、观念,并且推荐一些文章,让各位可以往此方向去了解 想当...
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 2...
目标 今天要来做的是倒数计时器 Step1 let countdown; function time...
// Thats proof of work 【前言】 今天终於来到这个 Project 测试的最...
本系列文章同步发布於笔者网站 昨天我们介绍了云端的五个必要条件,今天要接续昨天的云端定义,来介绍云端...