“Now I can look at you in peace; I don't eat you any more.”
― Franz Kafka
肠胃支配者
fromBeginning
:消费者群组会依据最後提交的偏移量,从那个偏移量开始读取讯息,如果偏移量是无效 ( invalid ) 或是未定义 ( not defined ),那就会看参数 fromBeginning
的设定值来决定消费者群组怎麽动作
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.subscribe({ topic: 'other-topic', fromBeginning: false })
如果设定为 true
消费者会从最早的偏移量开始读取讯息,设定为 false
消费者会从最新的偏移量开始读取讯息
预设值是 false
kafka.consumer({
groupId: <String>,
partitionAssigners: <Array>,
sessionTimeout: <Number>,
rebalanceTimeout: <Number>,
heartbeatInterval: <Number>,
metadataMaxAge: <Number>,
allowAutoTopicCreation: <Boolean>,
maxBytesPerPartition: <Number>,
minBytes: <Number>,
maxBytes: <Number>,
maxWaitTimeInMs: <Number>,
retry: <Object>,
maxInFlightRequests: <Number>,
rackId: <String>
})
消费者提供了暂停(pause)跟恢复(resume)两个方法,让消费者可以在消费一个或多个主题途中暂停或恢复消费,也有提供方法已暂停(paused)去取得目前暂停消费的主题清单
需要注意的是暂停消费一个主题,会是在下一个周期才生效,消费者还是可以读取当前这个批次的讯息
暂停一个消费者没有订阅的主题是不会动作的,恢复消费一个没有被暂停的主题也是不会动作的
另外,当消费者未启动时去暂停或是恢复消费都会喷错
范例,情境是消费者使用的外部依赖系统超过负载,我们可以暂停消费主题,等待一段时间後重新恢复消费主题
await consumer.connect()
await consumer.subscribe({ topic: 'jobs' })
await consumer.run({ eachMessage: async ({ topic, message }) => {
try {
await sendToDependency(message)
} catch (e) {
if (e instanceof TooManyRequestsError) {
consumer.pause([{ topic }])
setTimeout(() => consumer.resume([{ topic }]), e.retryAfter * 1000)
}
throw e
}
}})
这部分是可以做到更细微地控制的,可以暂停主题中的特定分区,而非只能暂停整个主题
举例来说,这样可以避免中断整个消费,只因为其中一个分区过於缓慢
consumer.run({
partitionsConsumedConcurrently: 3, // Default: 1
eachMessage: async ({ topic, partition, message }) => {
// This will be called up to 3 times concurrently
try {
await sendToDependency(message)
} catch (e) {
if (e instanceof TooManyRequestsError) {
consumer.pause([{ topic, partitions: [partition] }])
// Other partitions will keep fetching and processing, until if / when
// they also get throttled
setTimeout(() => {
consumer.resume([{ topic, partitions: [partition] }])
// Other partitions that are paused will continue to be paused
}, e.retryAfter * 1000)
}
throw e
}
},
})
可以呼叫 paused 方法去取得暂停的主题清单
const pausedTopicPartitions = consumer.paused()
for (const topicPartitions of pausedTopicPartitions) {
const { topic, partitions } = topicPartitions
console.log({ topic, partitions })
}
>>: Day 21 - WooCommerce: 信用卡付款设定选项 (上)
前言: 大家好,这是我第一次参加铁人赛 主要是想记录一下自己学过的东西 并和大家分享一些我觉得很重要...
前言 DevOps 是一种理念,目的是让开发到发布的速度、稳定性都能提升。 而 CI/CD 是实践 ...
useMemo 用於性能优化,避免重复执行高效能的渲染 如果传入的参数未改变,就直接沿用上次的计算结...
大家好,我是17King~ d(`・∀・)b 跟大家报告一个好消息! 我的书终於出版啦!!! (拍手...
嗨,我是 A Fei,来看看今天的题目: (题目来源:Codewars) Take the foll...