[Day 25] 实作 Redis Plugin 整合 Redis Coroutine Client

目前 Redis 几乎已成为後端微服务架构的必备基础设施,但是 Ktor 官方连 ORM 都没有整合了,Redis Client 当然也不会有啦,所以第一步就先来挑选 Redis Client。

选择 Redis Client 函式库 ktor-client-redis

以往 Redis Java Client 大多是使用 Jedis,近年来大家逐渐改用 Lettuce,而且 Spring Boot 2.x 版本也预设采用了。这是因为 Lettuce 底层是 Netty,所以支援 non-blocking 操作及 asynchronous API,效能会比 Jedis 好,理所当然是写新专案的最佳选择。不过我秉持着做 side project 就是要学习新技术及自干的精神,就是想找基於 Kotlin Coroutine 的 library 来用,结果真的让我在 Github 找到 JetBrains 官方团队开发的 ktor-client-redis

不过不要高兴的太早,因为 ktor-client-redis 只是实验性专案,不建议用在 production 环境,而且最後一次 commit 已经是3年前。後来我在 twitter 上面询问未来专案的计划,Ktor 官方团队成员回答 Unfortunately, it is dead at the moment Orz...。但是我当然不会因此放弃,因为我只想拿 Redis 做为 User Session Storage,应该只会用到基本的 get, set, del 指令而已。事实上,到目前为止,我使用上都没遇到什麽问题,但如果未来要应付大流量的话,那还是要改用 Lettuce 比较有保障。

整合 ktor-client-redis 原始码

因为 ktor-client-redis 没有 publish 到 maven repository,所以必须要下载原始码放到专案里面。果不其然,马上就遇到无法编译的问题,毕竟我是用最新版的 coroutine library,有些 API 已经改了。不过还好都是些小调整,而且 ktor-client-redis 的 redis 指令实作方式是放在独立的 kt 档案,例如 Hashes.kt 包含 hset, hget...指令,所以没用到的指令,我就不把对应的 kt 档放到我的专案里了。

实作 RedisPlugin

点我连结到完整的 Redis Plugin 程序码

定义 RedisConfig

我们先实作从外部设定档 application.conf 读取 Redis 连线设定。因为 ktor-client-redis 是基於 Coroutine,所以可以重复使用先前定义的 CoroutineActorConfig 物件来调校效能。

redis {
    host = ${?REDIS_HOST}
    port = ${?REDIS_PORT}
    password = ${?REDIS_PASSWORD}
    rootKeyPrefix = "fanpoll-"${app.server.env}
    client {
        coroutines = 10
        dispatcher {
            minPoolSize = 1
            maxPoolSize = 3
        }
    }
}
data class RedisConfig(
    val host: String, val port: Int = 6379, val password: String?, val rootKeyPrefix: String,
    val client: CoroutineActorConfig
)

data class CoroutineActorConfig(val coroutines: Int = 1, val dispatcher: ThreadPoolConfig? = null) {

    fun validate() {
        dispatcher?.validate()
    }

    class Builder {

        var coroutines: Int = 1
        private var dispatcher: ThreadPoolConfig? = null

        fun dispatcher(block: ThreadPoolConfig.Builder.() -> Unit) {
            dispatcher = ThreadPoolConfig.Builder().apply(block).build()
        }

        fun build(): CoroutineActorConfig {
            return CoroutineActorConfig(coroutines, dispatcher).apply { validate() }
        }
    }
}

data class ThreadPoolConfig(
    val fixedPoolSize: Int? = 1,
    val minPoolSize: Int? = null,
    val maxPoolSize: Int? = null,
    val keepAliveTime: Long? = null
) : ValidateableConfig {

    fun isFixedThreadPool(): Boolean = fixedPoolSize != null && (minPoolSize == null && maxPoolSize == null && keepAliveTime == null)

    override fun validate() {
        require(
            (minPoolSize != null && maxPoolSize != null && keepAliveTime != null) ||
                    (minPoolSize == null && maxPoolSize == null && keepAliveTime == null)
        ) {
            "minPoolSize, maxPoolSize, keepAliveTime should be configured"
        }
    }

    class Builder {

        var fixedPoolSize: Int? = 1
        var minPoolSize: Int? = null
        var maxPoolSize: Int? = null
        var keepAliveTime: Long? = null

        fun build(): ThreadPoolConfig {
            return ThreadPoolConfig(fixedPoolSize, minPoolSize, maxPoolSize, keepAliveTime).apply { validate() }
        }
    }
}

建立 RedisClient 物件,注册至 Koin DI

Redis Plugin 读取设定值後,就根据 RedisConfig 建立 RedisClient 物件。RedisClient 内部实作是透过 requestQueue: Channel<RedisRequest> 送出指令封包 ByteReadPacket,再使用 CompletableDeferred<Any?> 接收结果。其中 RedisClient 的最大连线数量 maxConnections 就是对应到我们所建立的 coroutine 数量,预设的 coroutine dispatcher 是 Dispatchers.Default,不过我们可以替换掉它。

建立连线後,随即执行 ping() 测试一下 latency。然後注册至 Koin DI,後续要操作 Redis 时,就可以透过 Koin 取得 RedisClient 物件。

最後不要忘记加上 KoinApplicationShutdownManager.register { closeClient() },在停止 Server 时呼叫 quit() 关闭 Redis 连线。

private lateinit var config: RedisConfig
private lateinit var client: RedisClient
private const val dispatcherName = "Redis"
private lateinit var dispatcher: CoroutineDispatcher
        
override fun install(pipeline: Application, configure: Configuration.() -> Unit): RedisFeature {
    val configuration = Configuration().apply(configure)
    val feature = RedisFeature(configuration)

    val appConfig = pipeline.get<MyApplicationConfig>()
    val logWriter = pipeline.get<LogWriter>()
    config = appConfig.infra.redis ?: configuration.build()

    initClient(config)

    pipeline.koin {
        modules(
            module(createdAtStart = true) {
                single { client }
                KoinApplicationShutdownManager.register { closeClient() }
            }
        )
    }

    return feature
}
        
private fun initClient(config: RedisConfig) {
    logger.info("========== init Redis Client... ==========")
    try {
        dispatcher = if (config.client.dispatcher != null)
            CoroutineUtils.createDispatcher(dispatcherName, config.client.dispatcher)
        else Dispatchers.IO

        logger.info("connect to redis => $config")
        client = RedisClient(
            address = InetSocketAddress(config.host, config.port),
            password = config.password,
            maxConnections = config.client.coroutines,
            dispatcher = dispatcher, rootKeyPrefix = config.rootKeyPrefix
        )

        runBlocking {
            logger.info("ping...")
            val latency = measureTimeMillis {
                client.ping()?.let {
                    logger.info(it)
                }
            }
            logger.info("ping latency = $latency milliseconds")
        }
    } catch (e: Throwable) {
        throw InternalServerException(InfraResponseCode.REDIS_ERROR, "fail to init redis client", e)
    }
    logger.info("========== init Redis Client completed ==========")
}

private fun closeClient() {
    try {
        runBlocking {
            logger.info("close redis connection...")
            client.quit()
            logger.info("redis connection closed")
        }

        if (dispatcher is ExecutorCoroutineDispatcher) {
            CoroutineUtils.closeDispatcher(dispatcherName, dispatcher as ExecutorCoroutineDispatcher)
        }
    } catch (e: Throwable) {
        throw InternalServerException(InfraResponseCode.REDIS_ERROR, "fail to close redis connection", e)
    }
}

完成结果


<<:  Day 15 - 用 useReducer 取代 useState !?

>>:  [Day 15]RDBMS / NoSQL

Day9-D3绘图:绘制形状的Helper Functions

本篇大纲:Generator、Component、Layout 截至目前,我们已经学会 D3 如何...

Day18 Android - RecyclerView应用

此元件是在Android 5.0提供的元件,相较於ListView,RecyclerView比较有弹...

Python课 第一式

print 输出指令 ?print Docstring: print(value, ..., sep...

Day11:开发 MVP

开发 mvp ...

Day24:安全性和演算法-迪菲-赫尔曼金钥交换(Diffie-Hellman Key Exchange)

前言 前一天提到的Hybrid Cryptosystem,其中使用到的「key」会进行封装,避免被有...