“I miss you deeply, unfathomably, senselessly, terribly.”
― Franz Kafka, Letters to Milena
监控状态事件 ( Instrumentation Events )
部分的操作状态会被 EventEmitter
监听,可以用以下这些方法去取得事件的状态 consumer.on()
, producer.on()
和 admin.on()
,范例如下
const { HEARTBEAT } = consumer.events
const removeListener = consumer.on(HEARTBEAT, e => console.log(`heartbeat at ${e.timestamp}`))
这些监听者都是非同步的,消费者不会因为你启动了这些监听者而导致阻塞,监听者若是发生错误并不会影响到消费者
以下为一个监听事件
{
id: <Number>,
type: <String>,
timestamp: <Number>,
payload: <Object>
}
所有可以监听的事件清单
消费者
- REQUEST
- payload:
{broker, clientId, correlationId, size, createdAt, sentAt, pendingDuration, duration, apiName, apiKey, apiVersion}
- 发生在每次向代理发起网路请求时
- CONNECT
- GROUP_JOIN
- payload:
{groupId, memberId, leaderId, isLeader, memberAssignment, groupProtocol, duration}
- 消费者加入消费者群组
- FETCH_START
- payload: {}
- 开始从(单数/复数)代理取得讯息
- FETCH
- payload: {numberOfBatches, duration}
- 结束从(单数/复数)代理取得讯息
- START_BATCH_PROCESS
- payload: {topic, partition, highWatermark, offsetLag, offsetLagLow, batchSize, firstOffset, lastOffset}
- 开始使用批次取得讯息
- END_BATCH_PROCESS
- payload: {topic, partition, highWatermark, offsetLag, offsetLagLow, batchSize, firstOffset, lastOffset, duration}
- payload: 无
- 结束使用批次取得讯息,包含使用方法 eachMessage/eachBatch 时
- COMMIT_OFFSETS
- payload: {groupId, memberId, groupGenerationId, topics}
- 提交偏移量
- STOP
- DISCONNECT
- CRASH
- payload: {error, groupId, restart}
- 消费者错误,在消费者错误的情况下,消费者会想要重启自己,如果这个错误不是重启可以解决的,消费者会停止并中断连线,你的应用程序应该要监听这个事件去做出判断
- HEARTBEAT
- payload: {groupId, memberId, groupGenerationId}
- 传送给协调器的心跳
- REBALANCING
- payload: {groupId, memberId}
- 消费者群组开始重新分配
- REQUEST_TIMEOUT
- payload: {broker, clientId, correlationId, createdAt, sentAt, pendingDuration, apiName, apiKey, apiVersion}
- 发送给代理的请求超时
- REQUEST_QUEUE_SIZE
- payload: {broker, clientId, queueSize}
- 同时并存的请求数量是依靠参数
maxInflightRequests
控制,如果数量有变化,这个事件会被触发
- RECEIVED_UNSUBSCRIBED_TOPICS
- payload: {groupId, generationId, memberId, assignedTopics, topicsSubscribed, topicsNotSubscribed}
- 如果你的消费者群组部分的消费者订阅的主题跟其余的消费者订阅的主题有差异的话,此事件会被触发
生产者
- REQUEST
- payload: {broker, clientId, correlationId, size, createdAt, sentAt, pendingDuration, duration, apiName, apiKey, apiVersion}
- 发生在每次向代理发起网路请求时
- CONNECT
- DISCONNECT
- REQUEST_TIMEOUT
- payload: {broker, clientId, correlationId, createdAt, sentAt, pendingDuration, apiName, apiKey, apiVersion}
- 向代理发起的请求超时
- REQUEST_QUEUE_SIZE
- payload: {broker, clientId, queueSize}
- 同时并存的请求数量是依靠参数
maxInflightRequests
控制,如果数量有变化,这个事件会被触发
管理者 ( admin )
- REQUEST
- payload: {broker, clientId, correlationId, size, createdAt, sentAt, pendingDuration, duration, apiName, apiKey, apiVersion}
- 发生在每次向代理发起网路请求时
- CONNECT
- DISCONNECT
- REQUEST_TIMEOUT
- payload: {broker, clientId, correlationId, createdAt, sentAt, pendingDuration, apiName, apiKey, apiVersion}
- 向代理发起的请求超时
- REQUEST_QUEUE_SIZE
- payload: {broker, clientId, queueSize}
- 同时并存的请求数量是依靠参数
maxInflightRequests
控制,如果数量有变化,这个事件会被触发