“Slept, awoke, slept, awoke, miserable life.”
― franz kafka
很多人40岁就死了,80岁才入葬,供勉之
今天要创建一个生产者推送讯息给 Kafka,很单纯的只要呼叫 KafkaJS 客户端的 producer function 即可
const producer = kafka.producer()
可选选项 | 说明 | 预设数值 |
---|---|---|
createPartitioner | 创建一个分区器,之後会再详述 | null |
retry | 设定重试机制,之後会再详述 | null |
metadataMaxAge | 在 partition leader 没有改变(没有新增代理或是分区)的情况,可以设定间隔多长的区间(毫秒)去强制更新一次元资料,预设是五分钟 | 300000毫秒(5分钟) |
allowAutoTopicCreation | 当消费者指定一个尚不存在的主题进行消费时,是否要自动创建该主题 | true |
transactionTimeout | 事务状态会等待多久才去更新的时间区间(毫秒),如果这个数值大於代理 (broker) 中的 transaction.max.timeout.ms 的设定数值,这个事务请求会喷错、并且显示InvalidTransactionTimeout error |
60000 |
idempotent | 这个参数目前还在实验阶段,如果开启这个参数生产者将会确保每一条讯息只会写入一次,必须将 Acks 设定为 -1、重试次数会设定为 MAX_SAFE_INTEGER | false |
maxInFlightRequests | 是否设定同时可以行进的 request 上限数量,如果没有设定就是无上限 | null |
这边很简单的传入索引键 (key) 和讯息 (value),分区将会以预设的分区器,去决定该消息要传递的分区
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'first-topic',
messages: [
{ key: 'key1', value: 'hello kafka' },
{ key: 'key2', value: 'I am message!' },
{ key: 'key3', value: 'No metter what' }
],
})
当然也可以自订要传送的分区,直接输入该分区的编号即可,像是 partition: 0
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'first-topic',
messages: [
{ key: 'key1', value: 'hello kafka', partition: 0 },
{ key: 'key2', value: 'I am message!', partition: 1 },
{ key: 'key3', value: 'No metter what', partition: 2 }
],
})
明天会再详细叙述各个设定参数的详细内容
>>: Day 27-Unit Test 应用於使用重构与测试手法优化 C# Code-1 (情境及应用-7)
写到这里,持续三十天真的让人焦躁、又觉得期待自己离里程碑更进了。 一开始有不少存稿,可计画总是赶不上...
昨天我们在专案里导入了 ktlint 这个用来检查程序码排版风格的套件,我们可以透过 Gradle ...
When we used the term best for any entity that mea...
嗨!大家好,我是Teng: 今年的疫情蛮严重的,希望大家都过得安好, 希望疫情快点过去,能回到一些线...
前面提到region会产生副本分散在每一个tikv store里,今天如果架设的机器是跨区的,跨机房...