卡夫卡的藏书阁【Book23】- Kafka - KafkaJS 监控状态事件

“I miss you deeply, unfathomably, senselessly, terribly.”
― Franz Kafka, Letters to Milena


监控状态事件 ( Instrumentation Events )


部分的操作状态会被 EventEmitter 监听,可以用以下这些方法去取得事件的状态 consumer.on(), producer.on()admin.on(),范例如下

const { HEARTBEAT } = consumer.events
const removeListener = consumer.on(HEARTBEAT, e => console.log(`heartbeat at ${e.timestamp}`))

这些监听者都是非同步的,消费者不会因为你启动了这些监听者而导致阻塞,监听者若是发生错误并不会影响到消费者

以下为一个监听事件

{
  id: <Number>,
  type: <String>,
  timestamp: <Number>,
  payload: <Object>
}

所有可以监听的事件清单


消费者
  • REQUEST
    • payload: {broker, clientId, correlationId, size, createdAt, sentAt, pendingDuration, duration, apiName, apiKey, apiVersion}
    • 发生在每次向代理发起网路请求时
  • CONNECT
    • payload: 无
    • 消费者连线到代理
  • GROUP_JOIN
    • payload: {groupId, memberId, leaderId, isLeader, memberAssignment, groupProtocol, duration}
    • 消费者加入消费者群组
  • FETCH_START
    • payload: {}
    • 开始从(单数/复数)代理取得讯息
  • FETCH
    • payload: {numberOfBatches, duration}
    • 结束从(单数/复数)代理取得讯息
  • START_BATCH_PROCESS
    • payload: {topic, partition, highWatermark, offsetLag, offsetLagLow, batchSize, firstOffset, lastOffset}
    • 开始使用批次取得讯息
  • END_BATCH_PROCESS
    • payload: {topic, partition, highWatermark, offsetLag, offsetLagLow, batchSize, firstOffset, lastOffset, duration}
    • payload: 无
    • 结束使用批次取得讯息,包含使用方法 eachMessage/eachBatch 时
  • COMMIT_OFFSETS
    • payload: {groupId, memberId, groupGenerationId, topics}
    • 提交偏移量
  • STOP
    • payload: 无
    • 消费者停止运作
  • DISCONNECT
    • payload: 无
    • 消费者中断连线
  • CRASH
    • payload: {error, groupId, restart}
    • 消费者错误,在消费者错误的情况下,消费者会想要重启自己,如果这个错误不是重启可以解决的,消费者会停止并中断连线,你的应用程序应该要监听这个事件去做出判断
  • HEARTBEAT
    • payload: {groupId, memberId, groupGenerationId}
    • 传送给协调器的心跳
  • REBALANCING
    • payload: {groupId, memberId}
    • 消费者群组开始重新分配
  • REQUEST_TIMEOUT
    • payload: {broker, clientId, correlationId, createdAt, sentAt, pendingDuration, apiName, apiKey, apiVersion}
    • 发送给代理的请求超时
  • REQUEST_QUEUE_SIZE
    • payload: {broker, clientId, queueSize}
    • 同时并存的请求数量是依靠参数 maxInflightRequests 控制,如果数量有变化,这个事件会被触发
  • RECEIVED_UNSUBSCRIBED_TOPICS
    • payload: {groupId, generationId, memberId, assignedTopics, topicsSubscribed, topicsNotSubscribed}
    • 如果你的消费者群组部分的消费者订阅的主题跟其余的消费者订阅的主题有差异的话,此事件会被触发

生产者


  • REQUEST
    • payload: {broker, clientId, correlationId, size, createdAt, sentAt, pendingDuration, duration, apiName, apiKey, apiVersion}
    • 发生在每次向代理发起网路请求时
  • CONNECT
    • payload: 无
    • 生产者连线到代理
  • DISCONNECT
    • payload: 无
    • 生产者中断连线
  • REQUEST_TIMEOUT
    • payload: {broker, clientId, correlationId, createdAt, sentAt, pendingDuration, apiName, apiKey, apiVersion}
    • 向代理发起的请求超时
  • REQUEST_QUEUE_SIZE
    • payload: {broker, clientId, queueSize}
    • 同时并存的请求数量是依靠参数 maxInflightRequests 控制,如果数量有变化,这个事件会被触发

管理者 ( admin )


  • REQUEST
    • payload: {broker, clientId, correlationId, size, createdAt, sentAt, pendingDuration, duration, apiName, apiKey, apiVersion}
    • 发生在每次向代理发起网路请求时
  • CONNECT
    • payload: 无
    • 管理者向代理发起连线
  • DISCONNECT
    • payload: 无
    • 管理者中断连线
  • REQUEST_TIMEOUT
    • payload: {broker, clientId, correlationId, createdAt, sentAt, pendingDuration, apiName, apiKey, apiVersion}
    • 向代理发起的请求超时
  • REQUEST_QUEUE_SIZE
    • payload: {broker, clientId, queueSize}
    • 同时并存的请求数量是依靠参数 maxInflightRequests 控制,如果数量有变化,这个事件会被触发

<<:  #22 No-code 之旅 — 静态网站可以部署到哪里?~

>>:  JavaScript Callback(回呼)

asin反查,怎么通过流量来定位关键词?

一般网上购物选准关键词是至关重要的,买家基本都是通过搜索关键词来购买产品的。只有卖家设置的关键词与产...

图的连通 (5)

9.2 找出分离点对 (Separating Pair) 如果一个点的子集合移除以後,会让图 G 变...

前端工程学习日记第13天

目标做成: 结果作成: code: html: <!DOCTYPE html> <...

[Day-15] for回圈

今天也是要练习回圈 但不是while而是for回圈 while以及for两者都是回圈 其实功能基本上...

状态流程图与有限状态机

状态图 (State Machine) ,是类似於本文章要介绍的主轴: 有限状态机 (Finite-...