“They say ignorance is bliss.... they're wrong ”
― Franz Kafka
删除指定主题的讯息,会删除到指定分区、指定偏移量之前的所有讯息,如果想要删除该分区所有讯息偏移量要设定为 -1
需要注意的是,这边是无法指定删除开始的点,一率都是从头开始删除的
await admin.deleteTopicRecords({
topic: <String>,
partitions: <SeekEntry[]>,
})
范例
await admin.deleteTopicRecords({
topic: 'custom-topic',
partitions: [
{ partition: 0, offset: '30' }, // delete up to and including offset 29
{ partition: 3, offset: '-1' }, // delete all available records on this partition
]
})
const {
AclResourceTypes,
AclOperationTypes,
AclPermissionTypes,
ResourcePatternTypes,
} = require('kafkajs')
const acl = [
{
resourceType: AclResourceTypes.TOPIC,
resourceName: 'topic-name',
resourcePatternType: ResourcePatternTypes.LITERAL,
principal: 'User:bob',
host: '*',
operation: AclOperationTypes.ALL,
permissionType: AclPermissionTypes.DENY,
},
{
resourceType: AclResourceTypes.TOPIC,
resourceName: 'topic-name',
resourcePatternType: ResourcePatternTypes.LITERAL,
principal: 'User:alice',
host: '*',
operation: AclOperationTypes.ALL,
permissionType: AclPermissionTypes.ALLOW,
},
]
await admin.createAcls({ acl })
这边需要注意安全机制可能在丛集那边没开启,如果是这种情况会喷错误:KafkaJSProtocolError: Security features are disabled
const {
AclResourceTypes,
AclOperationTypes,
AclPermissionTypes,
ResourcePatternTypes,
} = require('kafkajs')
const acl = {
resourceName: 'topic-name,
resourceType: AclResourceTypes.TOPIC,
host: '*',
permissionType: AclPermissionTypes.ALLOW,
operation: AclOperationTypes.ANY,
resourcePatternType: ResourcePatternTypes.LITERAL,
}
await admin.deleteAcls({ filters: [acl] })
// {
// filterResponses: [
// {
// errorCode: 0,
// errorMessage: null,
// matchingAcls: [
// {
// errorCode: 0,
// errorMessage: null,
// resourceType: AclResourceTypes.TOPIC,
// resourceName: 'topic-name',
// resourcePatternType: ResourcePatternTypes.LITERAL,
// principal: 'User:alice',
// host: '*',
// operation: AclOperationTypes.ALL,
// permissionType: AclPermissionTypes.ALLOW,
// },
// ],
// },
// ],
// }
这边需要注意安全机制可能在丛集那边没开启,如果是这种情况会喷错误:KafkaJSProtocolError: Security features are disabled
const {
AclResourceTypes,
AclOperationTypes,
AclPermissionTypes,
ResourcePatternTypes,
} = require('kafkajs')
await admin.describeAcls({
resourceName: 'topic-name,
resourceType: AclResourceTypes.TOPIC,
host: '*',
permissionType: AclPermissionTypes.ALLOW,
operation: AclOperationTypes.ANY,
resourcePatternTypeFilter: ResourcePatternTypes.LITERAL,
})
// {
// resources: [
// {
// resourceType: AclResourceTypes.TOPIC,
// resourceName: 'topic-name,
// resourcePatternType: ResourcePatternTypes.LITERAL,
// acls: [
// {
// principal: 'User:alice',
// host: '*',
// operation: AclOperationTypes.ALL,
// permissionType: AclPermissionTypes.ALLOW,
// },
// ],
// },
// ],
// }
这边需要注意安全机制可能在丛集那边没开启,如果是这种情况会喷错误:KafkaJSProtocolError: Security features are disabled
要开启Kafka 代理上的 ACL 安全机制,要在设定档 config/server-ssl.properties
加上以下设定
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
重新启动 Kafka 代理後,去查看相关 log,你会看会喷错 ClusterAuthorizationException
在每一行 log 的尾端
org.apache.kafka.common.errors.ClusterAuthorizationException: Request Request(processor=0, connectionId=127.0.0.1:9093-127.0.0.1:62402-0, session=Session(User:CN=localhost,/127.0.0.1), listenerName=ListenerName(SSL), securityProtocol=SSL, buffer=null) is not authorized.
这是因为 User:CN=localhost
没有被授权去动作,预设上是没有帐号可以进行任何动作的
未授权错误会记录在 INFO level 的 log,位置预设在 logs/kafka-authorizer.log,你应该可以查看到 ClusterAuthorizationException
喷错记录下来的 log
Principal = User:CN=localhost is Denied Operation = ClusterAction from host = 127.0.0.1 on resource = Cluster:LITERAL:kafka-cluster for request = UpdateMetadata with resourceRefCount = 1
资料来源:
https://jaceklaskowski.gitbooks.io/apache-kafka/content/kafka-demo-acl-authorization.html
<<: 【第二七天 - Flutter 知名外送平台画面练习(下)】
>>: 【27】遇到不平衡资料(Imbalanced Data) 时 使用 Undersampling 解决实验
线上 Ruby 编辑器:https://runrb.io/ Ruby String 文件:http...
继续开工~intent 的搜寻关键字-android intent action https://d...
前言:本篇目的为简要纪录函式内的变数种类与功能、参数赋予及厘清Callback function的运...
昨天介绍完如何跳页,今天将会分享如何跳页传值。 成品: 刚执行模拟器的样子 按下 Button 後会...
来练习读取 Google 文件中的表格内容吧。 今日要点: 》Document Service 文...