目前 Redis 几乎已成为後端微服务架构的必备基础设施,但是 Ktor 官方连 ORM 都没有整合了,Redis Client 当然也不会有啦,所以第一步就先来挑选 Redis Client。
以往 Redis Java Client 大多是使用 Jedis,近年来大家逐渐改用 Lettuce,而且 Spring Boot 2.x 版本也预设采用了。这是因为 Lettuce 底层是 Netty,所以支援 non-blocking 操作及 asynchronous API,效能会比 Jedis 好,理所当然是写新专案的最佳选择。不过我秉持着做 side project 就是要学习新技术及自干的精神,就是想找基於 Kotlin Coroutine 的 library 来用,结果真的让我在 Github 找到 JetBrains 官方团队开发的 ktor-client-redis。
不过不要高兴的太早,因为 ktor-client-redis 只是实验性专案,不建议用在 production 环境,而且最後一次 commit 已经是3年前。後来我在 twitter 上面询问未来专案的计划,Ktor 官方团队成员回答 Unfortunately, it is dead at the moment
Orz...。但是我当然不会因此放弃,因为我只想拿 Redis 做为 User Session Storage,应该只会用到基本的 get, set, del 指令而已。事实上,到目前为止,我使用上都没遇到什麽问题,但如果未来要应付大流量的话,那还是要改用 Lettuce 比较有保障。
因为 ktor-client-redis 没有 publish 到 maven repository,所以必须要下载原始码放到专案里面。果不其然,马上就遇到无法编译的问题,毕竟我是用最新版的 coroutine library,有些 API 已经改了。不过还好都是些小调整,而且 ktor-client-redis 的 redis 指令实作方式是放在独立的 kt 档案,例如 Hashes.kt 包含 hset, hget...指令,所以没用到的指令,我就不把对应的 kt 档放到我的专案里了。
我们先实作从外部设定档 application.conf 读取 Redis 连线设定。因为 ktor-client-redis 是基於 Coroutine,所以可以重复使用先前定义的 CoroutineActorConfig 物件来调校效能。
redis {
host = ${?REDIS_HOST}
port = ${?REDIS_PORT}
password = ${?REDIS_PASSWORD}
rootKeyPrefix = "fanpoll-"${app.server.env}
client {
coroutines = 10
dispatcher {
minPoolSize = 1
maxPoolSize = 3
}
}
}
data class RedisConfig(
val host: String, val port: Int = 6379, val password: String?, val rootKeyPrefix: String,
val client: CoroutineActorConfig
)
data class CoroutineActorConfig(val coroutines: Int = 1, val dispatcher: ThreadPoolConfig? = null) {
fun validate() {
dispatcher?.validate()
}
class Builder {
var coroutines: Int = 1
private var dispatcher: ThreadPoolConfig? = null
fun dispatcher(block: ThreadPoolConfig.Builder.() -> Unit) {
dispatcher = ThreadPoolConfig.Builder().apply(block).build()
}
fun build(): CoroutineActorConfig {
return CoroutineActorConfig(coroutines, dispatcher).apply { validate() }
}
}
}
data class ThreadPoolConfig(
val fixedPoolSize: Int? = 1,
val minPoolSize: Int? = null,
val maxPoolSize: Int? = null,
val keepAliveTime: Long? = null
) : ValidateableConfig {
fun isFixedThreadPool(): Boolean = fixedPoolSize != null && (minPoolSize == null && maxPoolSize == null && keepAliveTime == null)
override fun validate() {
require(
(minPoolSize != null && maxPoolSize != null && keepAliveTime != null) ||
(minPoolSize == null && maxPoolSize == null && keepAliveTime == null)
) {
"minPoolSize, maxPoolSize, keepAliveTime should be configured"
}
}
class Builder {
var fixedPoolSize: Int? = 1
var minPoolSize: Int? = null
var maxPoolSize: Int? = null
var keepAliveTime: Long? = null
fun build(): ThreadPoolConfig {
return ThreadPoolConfig(fixedPoolSize, minPoolSize, maxPoolSize, keepAliveTime).apply { validate() }
}
}
}
Redis Plugin 读取设定值後,就根据 RedisConfig 建立 RedisClient 物件。RedisClient 内部实作是透过 requestQueue: Channel<RedisRequest>
送出指令封包 ByteReadPacket
,再使用 CompletableDeferred<Any?>
接收结果。其中 RedisClient 的最大连线数量 maxConnections
就是对应到我们所建立的 coroutine 数量,预设的 coroutine dispatcher 是 Dispatchers.Default,不过我们可以替换掉它。
建立连线後,随即执行 ping()
测试一下 latency。然後注册至 Koin DI,後续要操作 Redis 时,就可以透过 Koin 取得 RedisClient 物件。
最後不要忘记加上 KoinApplicationShutdownManager.register { closeClient() }
,在停止 Server 时呼叫 quit()
关闭 Redis 连线。
private lateinit var config: RedisConfig
private lateinit var client: RedisClient
private const val dispatcherName = "Redis"
private lateinit var dispatcher: CoroutineDispatcher
override fun install(pipeline: Application, configure: Configuration.() -> Unit): RedisFeature {
val configuration = Configuration().apply(configure)
val feature = RedisFeature(configuration)
val appConfig = pipeline.get<MyApplicationConfig>()
val logWriter = pipeline.get<LogWriter>()
config = appConfig.infra.redis ?: configuration.build()
initClient(config)
pipeline.koin {
modules(
module(createdAtStart = true) {
single { client }
KoinApplicationShutdownManager.register { closeClient() }
}
)
}
return feature
}
private fun initClient(config: RedisConfig) {
logger.info("========== init Redis Client... ==========")
try {
dispatcher = if (config.client.dispatcher != null)
CoroutineUtils.createDispatcher(dispatcherName, config.client.dispatcher)
else Dispatchers.IO
logger.info("connect to redis => $config")
client = RedisClient(
address = InetSocketAddress(config.host, config.port),
password = config.password,
maxConnections = config.client.coroutines,
dispatcher = dispatcher, rootKeyPrefix = config.rootKeyPrefix
)
runBlocking {
logger.info("ping...")
val latency = measureTimeMillis {
client.ping()?.let {
logger.info(it)
}
}
logger.info("ping latency = $latency milliseconds")
}
} catch (e: Throwable) {
throw InternalServerException(InfraResponseCode.REDIS_ERROR, "fail to init redis client", e)
}
logger.info("========== init Redis Client completed ==========")
}
private fun closeClient() {
try {
runBlocking {
logger.info("close redis connection...")
client.quit()
logger.info("redis connection closed")
}
if (dispatcher is ExecutorCoroutineDispatcher) {
CoroutineUtils.closeDispatcher(dispatcherName, dispatcher as ExecutorCoroutineDispatcher)
}
} catch (e: Throwable) {
throw InternalServerException(InfraResponseCode.REDIS_ERROR, "fail to close redis connection", e)
}
}
<<: Day 15 - 用 useReducer 取代 useState !?
本篇大纲:Generator、Component、Layout 截至目前,我们已经学会 D3 如何...
此元件是在Android 5.0提供的元件,相较於ListView,RecyclerView比较有弹...
print 输出指令 ?print Docstring: print(value, ..., sep...
开发 mvp ...
前言 前一天提到的Hybrid Cryptosystem,其中使用到的「key」会进行封装,避免被有...