“You are at once both the quiet and the confusion of my heart.”
― Franz Kafka
可以取得丛集中代理的资讯,通常跟讯息、事件处理无关,这些资讯是用来监测跟做维护动作
await admin.describeCluster()
// {
// brokers: [
// { nodeId: 0, host: 'localhost', port: 9092 }
// ],
// controller: 0,
// clusterId: 'f8QmWTB8SQSLE6C99G4qzA'
// }
取得指定资源的设定资讯
await admin.describeConfigs({
includeSynonyms: <boolean>,
resources: <ResourceConfigQuery[]>
})
ResourceConfigQuery 的结构如下
{
type: <ConfigResourceType>,
name: <String>,
configNames: <String[]>
}
回传指定资源的所有设定档
const { ConfigResourceTypes } = require('kafkajs')
await admin.describeConfigs({
includeSynonyms: false,
resources: [
{
type: ConfigResourceTypes.TOPIC,
name: 'topic-name'
}
]
})
回传指定资源的指定设定档内容
const { ConfigResourceTypes } = require('kafkajs')
await admin.describeConfigs({
includeSynonyms: false,
resources: [
{
type: ConfigResourceTypes.TOPIC,
name: 'topic-name',
configNames: ['cleanup.policy']
}
]
})
资源类型对照表如下:
UNKNOWN: 0,
TOPIC: 2,
BROKER: 4,
BROKER_LOGGER: 8,
回传范例
{
resources: [
{
configEntries: [{
configName: 'cleanup.policy',
configValue: 'delete',
isDefault: true,
configSource: 5,
isSensitive: false,
readOnly: false
}],
errorCode: 0,
errorMessage: null,
resourceName: 'topic-name',
resourceType: 2
}
],
throttleTime: 0
}
alterConfigs 可以修改指定资源的相关设定
await admin.alterConfigs({
validateOnly: false,
resources: <ResourceConfig[]>
})
ResourceConfig 的结构如下
{
type: <ConfigResourceType>,
name: <String>,
configEntries: <ResourceConfigEntry[]>
}
ResourceConfigEntry 的结构如下
{
name: <String>,
value: <String>
}
范例
const { ConfigResourceTypes } = require('kafkajs')
await admin.alterConfigs({
resources: [{
type: ConfigResourceTypes.TOPIC,
name: 'topic-name',
configEntries: [{ name: 'cleanup.policy', value: 'compact' }]
}]
})
回传范例
{
resources: [{
errorCode: 0,
errorMessage: null,
resourceName: 'topic-name',
resourceType: 2,
}],
throttleTime: 0,
}
取得代理中有效的消费者群组清单
await admin.listGroups()
回传范例
{
groups: [
{groupId: 'testgroup', protocolType: 'consumer'}
]
}
用消费者群组ID去取得消费者群组的详细资料,使用方式跟 consumer.describeGroup() (https://kafka.js.org/docs/next/consuming#describe-group) 类似,但是允许你取得多个消费者群组的资料
await admin.describeGroups([ 'testgroup' ])
// {
// groups: [{
// errorCode: 0,
// groupId: 'testgroup',
// members: [
// {
// clientHost: '/172.19.0.1',
// clientId: 'test-3e93246fe1f4efa7380a',
// memberAssignment: Buffer,
// memberId: 'test-3e93246fe1f4efa7380a-ff87d06d-5c87-49b8-a1f1-c4f8e3ffe7eb',
// memberMetadata: Buffer,
// },
// ],
// protocol: 'RoundRobinAssigner',
// protocolType: 'consumer',
// state: 'Stable',
// }]
// `
可以使用 AssignerProtocol 的解码方法去拿到 memeberMetadata 和 memberAssignment 的资料
范例如下
const memberMetadata = AssignerProtocol.MemberMetadata.decode(memberMetadata)
const memberAssignment = AssignerProtocol.MemberAssignment.decode(memberAssignment)
用消费者群组ID去删除消费者群组
注记:只能够删除没有跟任何消费者有连线的消费者群组
await admin.deleteGroups([groupId])
范例:
await admin.deleteGroups(['group-test'])
范例回传
[
{groupId: 'testgroup', errorCode: 'consumer'}
]
因为此方法可以一次删除多个消费者群组,其中一个或多个群组可能会删除失败,这些资讯会在报错讯息中
try {
await admin.deleteGroups(['a', 'b', 'c'])
} catch (error) {
// error.name 'KafkaJSDeleteGroupsError'
// error.groups = [{
// groupId: a
// error: KafkaJSProtocolError
// }]
}
>>: 我的JavaScript日常- 第 31 天不是结束,反而是开始
BMI计算器 今天来个简单的综合应用,将前面所介绍的东西放在一起,结合设计出一个BMI计算器。这次介...
今天来整理一下以前的笔记,聊聊比较分类模型的评判依据:confusion matrix. 下图是常见...
问题回答 在 Vue 2,我们需要使用 .set() 等 Vue 语法来修改在 data 里的物件或...
如果想快速使用 Hook ,其实就参考 Hook 概观分的五个面向,包含一定会用也最常用的 Stat...
在Hook尚未出现之前,只有Class Component能够有生命周期可以使用。 什麽是生命周期?...