卡夫卡的藏书阁【Book27】- Kafka - KafkaJS Admin 4

“They say ignorance is bliss.... they're wrong ”
― Franz Kafka


删除主题的讯息

删除指定主题的讯息,会删除到指定分区、指定偏移量之前的所有讯息,如果想要删除该分区所有讯息偏移量要设定为 -1
需要注意的是,这边是无法指定删除开始的点,一率都是从头开始删除的

await admin.deleteTopicRecords({
    topic: <String>,
    partitions: <SeekEntry[]>,
})

范例

await admin.deleteTopicRecords({
    topic: 'custom-topic',
    partitions: [
        { partition: 0, offset: '30' }, // delete up to and including offset 29
        { partition: 3, offset: '-1' }, // delete all available records on this partition
    ]
})

新增 存取控制安全机制 ( create ACL )

const {
  AclResourceTypes,
  AclOperationTypes,
  AclPermissionTypes,
  ResourcePatternTypes,
} = require('kafkajs')

const acl = [
  {
    resourceType: AclResourceTypes.TOPIC,
    resourceName: 'topic-name',
    resourcePatternType: ResourcePatternTypes.LITERAL,
    principal: 'User:bob',
    host: '*',
    operation: AclOperationTypes.ALL,
    permissionType: AclPermissionTypes.DENY,
  },
  {
    resourceType: AclResourceTypes.TOPIC,
    resourceName: 'topic-name',
    resourcePatternType: ResourcePatternTypes.LITERAL,
    principal: 'User:alice',
    host: '*',
    operation: AclOperationTypes.ALL,
    permissionType: AclPermissionTypes.ALLOW,
  },
]

await admin.createAcls({ acl })

这边需要注意安全机制可能在丛集那边没开启,如果是这种情况会喷错误:KafkaJSProtocolError: Security features are disabled

删除 存取控制安全机制 ( DELETE ACL )

const {
  AclResourceTypes,
  AclOperationTypes,
  AclPermissionTypes,
  ResourcePatternTypes,
} = require('kafkajs')

const acl = {
  resourceName: 'topic-name,
  resourceType: AclResourceTypes.TOPIC,
  host: '*',
  permissionType: AclPermissionTypes.ALLOW,
  operation: AclOperationTypes.ANY,
  resourcePatternType: ResourcePatternTypes.LITERAL,
}

await admin.deleteAcls({ filters: [acl] })
// {
//   filterResponses: [
//     {
//     errorCode: 0,
//     errorMessage: null,
//     matchingAcls: [
//         {
//         errorCode: 0,
//         errorMessage: null,
//         resourceType: AclResourceTypes.TOPIC,
//         resourceName: 'topic-name',
//         resourcePatternType: ResourcePatternTypes.LITERAL,
//         principal: 'User:alice',
//         host: '*',
//         operation: AclOperationTypes.ALL,
//         permissionType: AclPermissionTypes.ALLOW,
//         },
//     ],
//     },
//   ],
// }

这边需要注意安全机制可能在丛集那边没开启,如果是这种情况会喷错误:KafkaJSProtocolError: Security features are disabled

取得存取控制安全机制的相关资讯 ( Describe ACL )

const {
  AclResourceTypes,
  AclOperationTypes,
  AclPermissionTypes,
  ResourcePatternTypes,
} = require('kafkajs')

await admin.describeAcls({
  resourceName: 'topic-name,
  resourceType: AclResourceTypes.TOPIC,
  host: '*',
  permissionType: AclPermissionTypes.ALLOW,
  operation: AclOperationTypes.ANY,
  resourcePatternTypeFilter: ResourcePatternTypes.LITERAL,
})
// {
//   resources: [
//     {
//       resourceType: AclResourceTypes.TOPIC,
//       resourceName: 'topic-name,
//       resourcePatternType: ResourcePatternTypes.LITERAL,
//       acls: [
//         {
//           principal: 'User:alice',
//           host: '*',
//           operation: AclOperationTypes.ALL,
//           permissionType: AclPermissionTypes.ALLOW,
//         },
//       ],
//     },
//   ],
// }

这边需要注意安全机制可能在丛集那边没开启,如果是这种情况会喷错误:KafkaJSProtocolError: Security features are disabled

开启 ACL 安全机制

要开启Kafka 代理上的 ACL 安全机制,要在设定档 config/server-ssl.properties 加上以下设定

authorizer.class.name=kafka.security.authorizer.AclAuthorizer

重新启动 Kafka 代理後,去查看相关 log,你会看会喷错 ClusterAuthorizationException 在每一行 log 的尾端

org.apache.kafka.common.errors.ClusterAuthorizationException: Request Request(processor=0, connectionId=127.0.0.1:9093-127.0.0.1:62402-0, session=Session(User:CN=localhost,/127.0.0.1), listenerName=ListenerName(SSL), securityProtocol=SSL, buffer=null) is not authorized.

这是因为 User:CN=localhost 没有被授权去动作,预设上是没有帐号可以进行任何动作的

查看安全机制相关 log

未授权错误会记录在 INFO level 的 log,位置预设在 logs/kafka-authorizer.log,你应该可以查看到 ClusterAuthorizationException 喷错记录下来的 log

Principal = User:CN=localhost is Denied Operation = ClusterAction from host = 127.0.0.1 on resource = Cluster:LITERAL:kafka-cluster for request = UpdateMetadata with resourceRefCount = 1

资料来源:
https://jaceklaskowski.gitbooks.io/apache-kafka/content/kafka-demo-acl-authorization.html


<<:  【第二七天 - Flutter 知名外送平台画面练习(下)】

>>:  【27】遇到不平衡资料(Imbalanced Data) 时 使用 Undersampling 解决实验

Day16 - 用简单的字串替换实作价值上亿的机器人

线上 Ruby 编辑器:https://runrb.io/ Ruby String 文件:http...

第12天~

继续开工~intent 的搜寻关键字-android intent action https://d...

为了转生而点技能-JavaScript,day11(函式内的变数、Callback function

前言:本篇目的为简要纪录函式内的变数种类与功能、参数赋予及厘清Callback function的运...

DAY 18 『 画面间跳页传值 - Protocol And Delegate 』

昨天介绍完如何跳页,今天将会分享如何跳页传值。 成品: 刚执行模拟器的样子 按下 Button 後会...

【Day 16】Google Apps Script - API 篇 - Document Service - 文件服务范例-读取表格

来练习读取 Google 文件中的表格内容吧。 今日要点: 》Document Service 文...