“I have spent all my life resisting the desire to end it.”
― Franz Kafka, Letters to Milena
To be or not to be
当消费者耗尽所有的重试次数後,一个非同步 ( async
) 的函式会被呼叫来决定是否重启消费者(重新设定consumer.run
),例如,这可用於在崩溃之前乾净地关闭资源,如果这是首选的话。该函数将传递错误,这允许它根据错误类型决定是否退出应用程序或允许它重新启动。
这个函数具有以下签名: (error: Error) => Promise<boolean>
如果 promise 解析为 true:消费者将重新启动
如果 promise 解析为 false:消费者将不会重新启动
如果承诺拒绝:消费者将重新启动
如果没有提供 restartOnFailure:消费者将重新启动
请注意,该函数只会在 KafkaJS 认为可重试的错误时被调用。对於不可重试的错误,不会重新启动使用者并且不会调用 restartOnFailure 函数。请参阅此列表以了解 Kafka 协议中的可重试错误,但请注意,在 KafkaJS 中仍会认为某些其他错误是可重试的,例如网络连接错误。
KafkaJS 是支援 Kafka transactions
注记: 交易在 Kafka 版本v0.11以上才有支援
在交易中传送讯息
呼叫异步方式 producer.transaction()
来初始化一个交易,回传的交易物件包含有方法 send 和 sendBatch
,交易完成时可以呼叫方法 transaction.commit()
或是 transaction.abort()
来结束交易,消费者只会看到那些已经提交 ( commit ) 的交易,避免读取稍後 rollback 掉的资料ㄡ
注记: Kafka 需要生产者做到以下设定去保证讯息只会传送一次( EoS ("Exactly-once-semantics"))
生产者必须设定 max flight request 为1
生产者必须设定 acks 为-1,也就是必须等待所有副本都回应收到
生产者必须有无限的重试次数
为了要生效 EOS 必须设定 maxInFlightRequests: 1
和 idempotent: true
const client = new Kafka({
clientId: 'transactional-client',
brokers: ['kafka1:9092', 'kafka2:9092'],
})
const producer = client.producer({ maxInFlightRequests: 1, idempotent: true })
一次交易中可以传送多个讯息,当呼叫 transaction.abort 时,所有的讯息都会 rollback
const transaction = await producer.transaction()
try {
await transaction.send({ topic, messages })
await transaction.commit()
} catch (e) {
await transaction.abort()
}
传送偏移量
将传送偏移量视为交易的一部分,也就是代表只有交易成功时才会去提交偏移量,这边使用方法 transaction.sendOffsets()
。
await transaction.sendOffsets({
consumerGroupId, topics
})
topics 的结构如下:
[{
topic: <String>,
partitions: [{
partition: <Number>,
offset: <String>
}]
}]
<<: Day17 - 物理模拟篇 - 弹力、引力与磁力II - 成为Canvas Ninja ~ 理解2D渲染的精髓
>>: stack heap内存、预编译、作用域链 - 概念介绍
在语句中常会出现概念相似的词,包括某类物品、地名、时间...等。例如,轮椅、拐杖、助行器、电动床都属...
自从国高中我的房间出现了电脑,虽然是很废的文书机,但它还是在夜深人静的凌晨抚慰我睡不着的心情。方形的...
继承昨日的漫谈,当一个专案已经开始拆解步骤和预期的交付产出,同时专案经理就会心里琢磨着「这件工作要找...
0x1 前言 昨天订单回覆有个 Web ATM URL 好吸引我,想去瞧一下里面长什麽样, 今天也把...