卡夫卡的藏书阁【Book14】- KafkaJS 生产者 - 压缩 2

“Paths are made by walking”
― Franz Kafka
千里之行,始於足下


KafkaJS 为了要尽量降低关联的套件数量,预设只有包含 GZIP,但是未来是有考虑要包含其他压缩方式进来

接下来我们在生产者的 send 加上可选选项压缩方式(optional compression),并且指定为 GZIP

const { CompressionTypes } = require('kafkajs')

async () => {
  await producer.send({
    topic: 'topic-name',
    compression: CompressionTypes.GZIP,
    messages: [
        { key: 'key1', value: 'hello world' },
        { key: 'key2', value: 'hey hey!' }
    ],
  })
}

消费者是可以自己解开zip压缩,不需要额外的动作,在之後的消费者篇章会提到

Snappy

安装套件 kafkajs-snappy 就可以使用 snappy 来压缩
Snappy是用C++开发的压缩和解压缩开发包,旨在提供高速压缩速度和合理的压缩率。虽然生成的压缩档案可能会比其他压缩库的要大上20%至100%,但是,相比其他的压缩库,Snappy却能够在特定的压缩率下拥有惊人的压缩速度。

npm install --save kafkajs-snappy

# yarn add kafkajs-snappy
const {  CompressionTypes, CompressionCodecs } = require('kafkajs')
const SnappyCodec = require('kafkajs-snappy')

CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec

LZ4

安装套件 kafkajs-lz4 就可以使用 LZ4

npm install --save kafkajs-lz4
# yarn add kafkajs-lz4
const { CompressionTypes, CompressionCodecs } = require('kafkajs')
const LZ4 = require('kafkajs-lz4')

CompressionCodecs[CompressionTypes.LZ4] = new LZ4().codec

kafkajs-lz4 套件还有支援一些可选选项去调整 LZ4 的压缩程度
如下,可以设定最高压缩度(16)

const lz4Codec = new LZ4Codec({
    preferences: {
        compressionLevel: 16
    }
}).codec;
 
CompressionCodecs[CompressionTypes.LZ4] = lz4Codec;

Compression level 设定范围是从 1 ~ 16,数字越大代表压缩比越大,如果设定大於16也会同样视为16,压缩比是用压缩速度交换而来,通常建议的设定值是1、代表最快的压缩速度,同时也是预设值,如果想要高压缩比建议设定为9,当然,压缩速度和压缩比率的关系会依据要压缩资料的而有所不同、所以需要视情况调整。
解压缩速度的部分,不管 Compression level 设定为何速度都会一样快。

ZSTD

要使用 Zstandard 要安装套件 @kafkajs/zstd

npm install --save @kafkajs/zstd
# yarn add @kafkajs/zstd
const { CompressionTypes, CompressionCodecs } = require('kafkajs')
const ZstdCodec = require('@kafkajs/zstd')

CompressionCodecs[CompressionTypes.ZSTD] = ZstdCodec()

带入参数一样可以设定压缩比,压缩速度最高是 level 1

const compressionParams = { level: 1 }
const decompressionParams = {}
CompressionCodecs[CompressionTypes.ZSTD] = ZstdCodec(compressionParams, decompressionParams)

客制化编码解码器
可以简单地用现成的 lib 去实作编码解码器 ( codec ),一个编码解码器是一个有两个 async 方法的物件,分别是压缩 ( compress ) 和解压缩 ( decompress )

const MyCustomSnappyCodec = {
    async compress(encoder) {
        return someCompressFunction(encoder.buffer)
    },

    async decompress(buffer) {
        return someDecompressFunction(buffer)
    }
}

现在将它实作

const { CompressionTypes, CompressionCodecs } = require('kafkajs')
CompressionCodecs[CompressionTypes.Snappy] = () => MyCustomSnappyCodec

新的编码解码器 ( codec ) 现在可以使用了

await producer.send({
    topic: 'topic-name',
    compression: CompressionTypes.Snappy,
    messages: [
        { key: 'key1', value: 'hello world' },
        { key: 'key2', value: 'hey hey!' }
    ],
})

p.s. 编解码器对应的英文「codec」(compress和decompress简化而成的合成词语)


  • 压缩比:LZ4 > GZIP > Snappy
  • 吞吐量:LZ4 > Snappy > GZIP

资料来源:
https://github.com/indix/kafkajs-lz4
https://github.com/tulios/kafkajs-snappy
https://github.com/kafkajs/zstd


<<:  Youtube Data API 教学 - 告一个段落

>>:  【Day13-计数】如何快速统计资料出现频率?——使用Counter或groupby快速计算元素出现个数

结语

前言 今年13th铁人终於来到了尾声,又过了一年时间过得很快,今年是第二年的挑战(依旧主管迫害啊~)...

浮点数的二进位表达方法

浮点数的二进位表达方法 浮点运算知识点 小数二进制表达 与整数的二进制表达相同我们可以假设任意小数的...

[DAY 16] _Si7020温湿度读写

今天来说说温湿度读取的部分吧,首先来看看这颗的Datasheet: https://www.sila...

敏捷方法或框架-极限编程(XP)提供了最多的程序开发实务

敏捷是一种心态,秉承《敏捷软件开发和实践宣言》中所述的四个价值观和十二个原则。敏捷是一个笼统的术语。...

[Day 10] 模型达到商业指标的挑战 — Test set performance 的殒落

Achieving low average tested error isn't good eno...