卡夫卡的藏书阁【Book13】- KafkaJS 生产者 1

“Slept, awoke, slept, awoke, miserable life.”
― franz kafka
很多人40岁就死了,80岁才入葬,供勉之


今天要创建一个生产者推送讯息给 Kafka,很单纯的只要呼叫 KafkaJS 客户端的 producer function 即可

const producer = kafka.producer()

可选选项们

可选选项 说明 预设数值
createPartitioner 创建一个分区器,之後会再详述 null
retry 设定重试机制,之後会再详述 null
metadataMaxAge 在 partition leader 没有改变(没有新增代理或是分区)的情况,可以设定间隔多长的区间(毫秒)去强制更新一次元资料,预设是五分钟 300000毫秒(5分钟)
allowAutoTopicCreation 当消费者指定一个尚不存在的主题进行消费时,是否要自动创建该主题 true
transactionTimeout 事务状态会等待多久才去更新的时间区间(毫秒),如果这个数值大於代理 (broker) 中的 transaction.max.timeout.ms 的设定数值,这个事务请求会喷错、并且显示InvalidTransactionTimeout error 60000
idempotent 这个参数目前还在实验阶段,如果开启这个参数生产者将会确保每一条讯息只会写入一次,必须将 Acks 设定为 -1、重试次数会设定为 MAX_SAFE_INTEGER false
maxInFlightRequests 是否设定同时可以行进的 request 上限数量,如果没有设定就是无上限 null

这边很简单的传入索引键 (key) 和讯息 (value),分区将会以预设的分区器,去决定该消息要传递的分区

const producer = kafka.producer()

await producer.connect()
await producer.send({
    topic: 'first-topic',
    messages: [
        { key: 'key1', value: 'hello kafka' },
        { key: 'key2', value: 'I am message!' },
        { key: 'key3', value: 'No metter what' }
    ],
})

当然也可以自订要传送的分区,直接输入该分区的编号即可,像是 partition: 0

const producer = kafka.producer()

await producer.connect()
await producer.send({
    topic: 'first-topic',
    messages: [
        { key: 'key1', value: 'hello kafka', partition: 0 },
        { key: 'key2', value: 'I am message!', partition: 1 },
        { key: 'key3', value: 'No metter what', partition: 2 }
    ],
})

明天会再详细叙述各个设定参数的详细内容


<<:  LeetCode解题 Day27

>>:  Day 27-Unit Test 应用於使用重构与测试手法优化 C# Code-1 (情境及应用-7)

尾声,故事主角继续写着故事

写到这里,持续三十天真的让人焦躁、又觉得期待自己离里程碑更进了。 一开始有不少存稿,可计画总是赶不上...

第十四天:在 TeamCity 上执行程序码风格检查

昨天我们在专案里导入了 ktlint 这个用来检查程序码排版风格的套件,我们可以透过 Gradle ...

What is The Best Way to Save Emails in Outlook?

When we used the term best for any entity that mea...

Day 2 为什麽要学Compose UI?

嗨!大家好,我是Teng: 今年的疫情蛮严重的,希望大家都过得安好, 希望疫情快点过去,能回到一些线...

Day 13 - 安装(三)副本调度设定

前面提到region会产生副本分散在每一个tikv store里,今天如果架设的机器是跨区的,跨机房...