卡夫卡的藏书阁【Book23】- Kafka - KafkaJS 监控状态事件

“I miss you deeply, unfathomably, senselessly, terribly.”
― Franz Kafka, Letters to Milena


监控状态事件 ( Instrumentation Events )


部分的操作状态会被 EventEmitter 监听,可以用以下这些方法去取得事件的状态 consumer.on(), producer.on()admin.on(),范例如下

const { HEARTBEAT } = consumer.events
const removeListener = consumer.on(HEARTBEAT, e => console.log(`heartbeat at ${e.timestamp}`))

这些监听者都是非同步的,消费者不会因为你启动了这些监听者而导致阻塞,监听者若是发生错误并不会影响到消费者

以下为一个监听事件

{
  id: <Number>,
  type: <String>,
  timestamp: <Number>,
  payload: <Object>
}

所有可以监听的事件清单


消费者
  • REQUEST
    • payload: {broker, clientId, correlationId, size, createdAt, sentAt, pendingDuration, duration, apiName, apiKey, apiVersion}
    • 发生在每次向代理发起网路请求时
  • CONNECT
    • payload: 无
    • 消费者连线到代理
  • GROUP_JOIN
    • payload: {groupId, memberId, leaderId, isLeader, memberAssignment, groupProtocol, duration}
    • 消费者加入消费者群组
  • FETCH_START
    • payload: {}
    • 开始从(单数/复数)代理取得讯息
  • FETCH
    • payload: {numberOfBatches, duration}
    • 结束从(单数/复数)代理取得讯息
  • START_BATCH_PROCESS
    • payload: {topic, partition, highWatermark, offsetLag, offsetLagLow, batchSize, firstOffset, lastOffset}
    • 开始使用批次取得讯息
  • END_BATCH_PROCESS
    • payload: {topic, partition, highWatermark, offsetLag, offsetLagLow, batchSize, firstOffset, lastOffset, duration}
    • payload: 无
    • 结束使用批次取得讯息,包含使用方法 eachMessage/eachBatch 时
  • COMMIT_OFFSETS
    • payload: {groupId, memberId, groupGenerationId, topics}
    • 提交偏移量
  • STOP
    • payload: 无
    • 消费者停止运作
  • DISCONNECT
    • payload: 无
    • 消费者中断连线
  • CRASH
    • payload: {error, groupId, restart}
    • 消费者错误,在消费者错误的情况下,消费者会想要重启自己,如果这个错误不是重启可以解决的,消费者会停止并中断连线,你的应用程序应该要监听这个事件去做出判断
  • HEARTBEAT
    • payload: {groupId, memberId, groupGenerationId}
    • 传送给协调器的心跳
  • REBALANCING
    • payload: {groupId, memberId}
    • 消费者群组开始重新分配
  • REQUEST_TIMEOUT
    • payload: {broker, clientId, correlationId, createdAt, sentAt, pendingDuration, apiName, apiKey, apiVersion}
    • 发送给代理的请求超时
  • REQUEST_QUEUE_SIZE
    • payload: {broker, clientId, queueSize}
    • 同时并存的请求数量是依靠参数 maxInflightRequests 控制,如果数量有变化,这个事件会被触发
  • RECEIVED_UNSUBSCRIBED_TOPICS
    • payload: {groupId, generationId, memberId, assignedTopics, topicsSubscribed, topicsNotSubscribed}
    • 如果你的消费者群组部分的消费者订阅的主题跟其余的消费者订阅的主题有差异的话,此事件会被触发

生产者


  • REQUEST
    • payload: {broker, clientId, correlationId, size, createdAt, sentAt, pendingDuration, duration, apiName, apiKey, apiVersion}
    • 发生在每次向代理发起网路请求时
  • CONNECT
    • payload: 无
    • 生产者连线到代理
  • DISCONNECT
    • payload: 无
    • 生产者中断连线
  • REQUEST_TIMEOUT
    • payload: {broker, clientId, correlationId, createdAt, sentAt, pendingDuration, apiName, apiKey, apiVersion}
    • 向代理发起的请求超时
  • REQUEST_QUEUE_SIZE
    • payload: {broker, clientId, queueSize}
    • 同时并存的请求数量是依靠参数 maxInflightRequests 控制,如果数量有变化,这个事件会被触发

管理者 ( admin )


  • REQUEST
    • payload: {broker, clientId, correlationId, size, createdAt, sentAt, pendingDuration, duration, apiName, apiKey, apiVersion}
    • 发生在每次向代理发起网路请求时
  • CONNECT
    • payload: 无
    • 管理者向代理发起连线
  • DISCONNECT
    • payload: 无
    • 管理者中断连线
  • REQUEST_TIMEOUT
    • payload: {broker, clientId, correlationId, createdAt, sentAt, pendingDuration, apiName, apiKey, apiVersion}
    • 向代理发起的请求超时
  • REQUEST_QUEUE_SIZE
    • payload: {broker, clientId, queueSize}
    • 同时并存的请求数量是依靠参数 maxInflightRequests 控制,如果数量有变化,这个事件会被触发

<<:  #22 No-code 之旅 — 静态网站可以部署到哪里?~

>>:  JavaScript Callback(回呼)

[C 语言笔记--Day22] 6.S081 Lab syscall: 在 xv6 中新增一个 System Call

关於 xv6 的环境架设,可以参考我之前写的这篇文章 6.S082 课程连结(我这里用的是 2021...

Day 23 XIB跳转页面以及UIAlertController的练习(1/3)

今天我们来练习,XIB的跳页功能跟Alert提示框吧~ 首先拉一个Button,按下後提示框跳出,按...

从 IT 技术面细说 Search Console 的 27 组数字 KPI (6) :网页

记得在日本的一本 SEO 书写了一个很生动的范例,一间公司业绩要成长,取决於业务员的数量,而网站的...

Day30 - Windows 提权(1)-Unquoted Service Paths、修改服务提权

假设我们取得受害主机的 shell (cmd.exe 或 powershell)可以根据自己的需求取...

Day 22 - Formatter 与 Linter - 提升程序品质工具

前言 昨天讲完 Code Review,团队一致的写 code 风格,可以大幅提升 review 的...