卡夫卡的藏书阁【Book12】- KafkaJS 安装

“By believing passionately in something that still does not exist, we create it. The nonexistent is whatever we have not sufficiently desired.”
― Franz Kafka
吸引力法则 只要有心人人都可以是食神


Kafka 脚本的练习到这边大概结束了,之後 Kafka mirror 的部分会放到後面再介绍

今天开始会用 KafkaJS 这个套件来练习 Kafka 的各种运作,基本的生产者和消费者会需要做哪些设定,相信有实际的范例会更清楚了解 Kafka 的机制。

这边使用的是 KafkaJS 的 Pre-release versions

安装 KafkaJS

安装的方式就是基本的 npm 和 yarn

npm install kafkajs

或是

yarn add kafkajs

基本用法

一开始可以将 KafkaJS 的客户端指定一个或是多个代理 ( broker ) 去实例化它

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092', 'localhost:9093', 'localhost:9094'],
})

接着,可以利用 KafkaJS 客户端来创建一个生产者 ( producer ),指定一个主题向它传递讯息,并在最後中断连结

const producer = kafka.producer()

await producer.connect()
await producer.send({
  topic: 'test-topic',
  messages: [
    { value: 'Hello KafkaJS user!' },
  ],
})

await producer.disconnect()

然後,可以创建一个消费者来消费刚刚产生的讯息,确定刚刚有产生讯息成功,一样是指定订阅主题 test-topic,并且指定 fromBeginning 为启动,从这个主题的第一笔讯息开始消费,如果设定关闭就是消化订阅当下之後新产生的讯息

const consumer = kafka.consumer({ groupId: 'test-group' })

await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log({
      value: message.value.toString(),
    })
  },
})

今天是先简单的入门的范例,明天会开始介绍生产者的相关设定概念,像是传递的方式、分区器、顺序性、新旧生产者API..等


<<:  Day12:今天来谈一下Microsoft-Defender-for-Endpoint的设定及管理自动化

>>:  EP14 - 调整架构 EC2 与负载平衡器

[重构倒数第19天] - i18n什麽的交给前端来处理吧(二) Vue3 载入Vue-i18n

前言 该系列是为了让看过Vue官方文件或学过Vue但是却不知道怎麽下手去重构现在有的网站而去规画的系...

Day30 - 使用 Rails Generator 快速实作卡米狗学说话

LINE Developers:https://developers.line.biz/zh-ha...

[Golang]panic是什麽?-心智图总结

1. panic是什麽? 程序在运行时,发生意料之外的程序异常。例如: 访问,不存在的array。 ...

用Firebase Web的小功能分享 (3)

上传档案後制作超连结下载档案 - 用innerHTML制作超连结 code 最後就是如何显示map...

WP Rocket 一整年难得的7折优惠+7款外挂测速比较

黑色购物节,只有9天:所有方案7折 折扣内容:所有方案皆享「7折」优惠 折扣期间:2020/11/...