[Day 27] 实作 Redis PubSub Keyspace Notification 订阅 Session Key Expired 事件通知

session authentication 的机制是

  • 登入时建立 sessionId 储存 session 资料至 Redis
  • 验证时使用 sessionId 寻找 Redis 是否存在 session
  • 登出时使用 sessionId 删除 Redis session 资料

以上操作都是使用 sessionId 操作某个 user 的 session 资料,如果系统允许使用者重复登入的话,我们怎麽反过来知道某个使用者拥有那些 sessionId?

直觉上的解决方式就是在登入时,除了记录 sessionId 与 session 资料的关联,也一并把 sessionId 加入至 userId 与 sessionId 的一对多 sessionId 清单。反之,登出时也一并从 user 的 sessionId 清单中删除此 sessionId。但是这麽做还不够,因为当 sessionId 逾期而被 Redis 删除时,并不会连动更新 sessionId 清单,这必须由我们系统自行把过期的 sessionId 从 sessionId 清单删除。不过问题来了,我们怎麽知道 sessionId 逾期被 Redis 删除,系统总不能一直使用 sessionId 去询问 Redis 是否还存在,然後再去更新 sessionId 清单,这种作法在使用者数量龎大时根本不可行。

Redis PubSub Keyspace Notification

还好 Redis 有提供 Redis Keyspace Notifications 机制,我们系统可以先订阅 keyspace 的事件通知,当 sessionId 过期时 Redis 会通知我们。不过使用此功能前要注意以下事项,详细说明请参考官方文件

  • 支援 Redis 2.8.0 以上版本
  • Redis 不会储存事件,所以如果系统与 Redis 失去连线的话,就收不到通知了,可靠性较低
  • 预设不启用避免消耗 CPU 资源,必须先执行以下指令开启
redis-cli config set notify-keyspace-events KEA

Redis Keyspace Notifications 的流程上分成两阶段,下面以 Redis 指令及传输资料来说明

  1. 系统先向 Redis 订阅 expired 事件
执行指令 psubscribe '__keyevent*__:expired'
然後收到成功的回应资料 "psubscribe","__keyevent*__:expired",1
  1. 後续当 session 过期时,系统会收到 expired 事件通知
"pmessage","__keyevent*__:expired","__keyevent@0__:expired","我是sessionId"

接下来我们系统再从过期的 sessionId 找出 userId,然後再根据 userId 去更新使用者的 sessionId 清单即可。本专案的 sessionId 清单资料也储存於 Redis,但使用的是 Hash 资料结构,至於原始的 session 资料是使用 String 资料结构。

  • [String] session 资料: key => sessionId ; value => session data (json)
  • [Hash] sessionId 清单: key => userId ; field => sessionId ; value => expireTime (也可以储存其它资料)

修改 ktor-client-redis 函式库,实作订阅 Keyspace Notification 功能

我使用的 Redis Client 是 ktor-client-redis,它是 JetBrains 开发的实验性专案,只有实作基本的 PubSub 功能,所以我必须修改原始码,在 PubSub 的基础上实作 KeyspaceNotification 功能。更多 ktor-client-redis 的资讯,请看我先前写的文章 => [Day 25] 实作 Redis Plugin 整合 Redis Coroutine Client

首先是修改 ktor-client-redis 的 PubSub.kt 原始码,我在 RedisPubSub 里面增加 Packet 子类别 KeyspaceNotification,同时也在 mapToPacket 方法里,把从 Redis 收到资料转为 KeyspaceNotification 物件。

点我连结至修改後的 PubSub.kt 程序码

interface RedisPubSub {

    interface Packet

    data class Message(val channel: String, val message: String, val isPattern: Boolean = false) : Packet

    data class Subscription(val channel: String, val subscriptions: Long, val subscribe: Boolean, val isPattern: Boolean = false) : Packet

    data class KeyspaceNotification(val channel: String, val isKeyEvent: Boolean, val event: String, val key: String) :
        IdentifiableObject<String>(), Packet {

        override val id: String
            get() = "$event-$key"
    }
}

private fun CoroutineScope.mapToPacket(rawChannel: ReceiveChannel<Any>) = produce(capacity = Channel.UNLIMITED) {
    for (data in rawChannel) {
        logger.debug("data = $data")
        val list = data as List<Any>
        val kind = String(list[0] as ByteArray)
        val channel = String(list[1] as ByteArray)

        val isPattern = kind.startsWith("p")
        val isMessage = kind == "message"
        val isSubscription = kind.startsWith("psub") || kind.startsWith("sub")
        val isPMessage = kind == "pmessage"
        val isKeyspaceNotification = isPMessage && list.size == 4

        val packet = when {
            isMessage -> RedisPubSub.Message(channel, String(list[2] as ByteArray), isPattern)
            isSubscription -> RedisPubSub.Subscription(channel, list[2] as Long, isSubscription, isPattern)
            isKeyspaceNotification -> {
                val info = String(list[2] as ByteArray)
                val pMessage = String(list[3] as ByteArray)
                val isKeyEvent = info.contains("keyevent")
                val event = if (isKeyEvent) info.substringAfterLast(":") else pMessage
                val key = if (isKeyEvent) pMessage else info.substringAfterLast(":")
                RedisPubSub.KeyspaceNotification(channel, isKeyEvent, event, key)
            }
            else -> error("Undefined Redis PubSub raw data: $list")
        }
        logger.debug("packet = $packet")
        send(packet)
    }
}

因为 PubSub 收到的资料有多种格式,所以在此使用 coroutine channel pipeline 的方式实作,过滤掉其它格式的资料,只保留 KeyspaceNotification 物件,这样就完成底层接收 Redis 事件通知资料的部分了。

private fun CoroutineScope.filterKeyspaceNotification(channel: ReceiveChannel<RedisPubSub.Packet>) = produce {
    for (packet in channel) {
        if (packet is RedisPubSub.KeyspaceNotification)
            send(packet)
    }
}

fun CoroutineScope.redisKeyspaceNotificationChannel(redisPubSub: RedisPubSub): ReceiveChannel<RedisPubSub.KeyspaceNotification> =
    filterKeyspaceNotification(mapToPacket(((redisPubSub as RedisPubSubImpl).rawChannel)))

实作 RedisKeyspaceNotificationListener 处理 Keyspace Notification

虽然我目前只需要订阅 session key expired 事件而已,但考量到 Redis KeyspaceNotification 可以应用到任何 key 的事件通知,所以我决定先在底层实作泛用的 RedisKeyspaceNotificationListenerRedisKeyspaceNotificationListener 的内部实作上是利用我之前做好的泛用 CoroutineActor,我们可以设定处理事件的 coroutine 数量及 dispatcher,与底层收取事件通知的 PubSub 的 dispatcher 分开,藉此调校效能。CoroutineActor 内部一样使用 coroutine channel pipeline 的方式实作,先从上游接收底层 PubSub 收到的资料 => val upstreamChannel = redisKeyspaceNotificationChannel(redisPubSub),然後再丢到 CoroutineActor 内部的 channel,最後再执行先前注册到 RedisKeyspaceNotificationListener 的 lambda。

※ 详细 CoroutineActor 的实作,请看我先前写的文章 => [Day 21] 使用 Coroutine SendChannel 处理非同步工作

点我连结到完整的 RedisKeyspaceNotificationListener 程序码

class RedisKeyspaceNotificationListener(
    config: KeyspaceNotificationConfig,
    redisPubSub: RedisPubSub,
    logWriter: LogWriter
) {

    private val logger = KotlinLogging.logger {}

    private val actorName = "RedisKeyspaceNotificationListener"

    private val keyspaceNotificationBlocks: MutableList<suspend (RedisPubSub.KeyspaceNotification) -> Unit> = mutableListOf()

    private val actor: CoroutineActor<RedisPubSub.KeyspaceNotification> = CoroutineActor(
        actorName, Channel.UNLIMITED,
        config.processor, Dispatchers.IO,
        this::processKeyspaceNotification, { channel ->
            launch {
                val upstreamChannel = redisKeyspaceNotificationChannel(redisPubSub)
                for (message in upstreamChannel) {
                    try {
                        channel.send(message) // non-blocking if Channel.UNLIMITED
                    } catch (e: Throwable) { // ignore CancellationException because we don't call channel.cancel()
                        var errorMsg = ""
                        if (e is ClosedSendChannelException) {
                            errorMsg = "$actorName is closed"
                            logger.warn("$errorMsg => $message")
                        } else {
                            errorMsg = "$actorName unexpected error"
                            logger.error("$errorMsg => $message", e)
                        }
                        logWriter.write(
                            ErrorLog.internal(
                                InternalServerException(InfraResponseCode.REDIS_ERROR, errorMsg, e, mapOf("message" to message)),
                                actorName, message.id
                            )
                        )
                        // TODO: persistence unDeliveredMessage and retry later
                    }
                }
            }
        }
    )

    private suspend fun processKeyspaceNotification(data: RedisPubSub.KeyspaceNotification) {
        keyspaceNotificationBlocks.forEach { it(data) }
    }

    fun subscribeKeyspaceNotification(block: suspend (RedisPubSub.KeyspaceNotification) -> Unit) {
        keyspaceNotificationBlocks.add(block)
    }

    fun shutdown() {
        actor.close()
    }
}

Redis Plugin 初始化 RedisKeyspaceNotificationListener

以上功能都实作完了之後,最後就是整合至 Ktor Plugin 了。我先在 Redis Plugin 里面,根据外部设定档 application.conf 读取 Redis 连线设定及 RedisKeyspaceNotificationListener 设定。然後建立 PubSub 专用的 RedisClient 物件,与原有的 RedisClient 物件分开,因为这里我只想设定一个单执行绪处理事件通知就好。接下来再执行 psubscribe 指令向 Redis 订阅事件通知,最後再初始化 RedisKeyspaceNotificationListener 就完成了。

点我连结到完整的 RedisPlugin 程序码

redis {
    host = ${REDIS_HOST}
    port = ${REDIS_PORT}
    password = ${?REDIS_PASSWORD}
    rootKeyPrefix = "fanpoll-"${app.server.env}
    client {
        coroutines = 3
        dispatcher {
            fixedPoolSize = 1
        }
    }
    subscribe {
        patterns = ["__keyevent*__:expired"]
        channels = []
        keyspaceNotification {
            processor {
                coroutines = 3
                dispatcher {
                    fixedPoolSize = 1
                }
            }
        }
    }
}
override fun install(pipeline: Application, configure: Configuration.() -> Unit): RedisFeature {
    val configuration = Configuration().apply(configure)
    val feature = RedisFeature(configuration)

    val appConfig = pipeline.get<MyApplicationConfig>()
    val logWriter = pipeline.get<LogWriter>()
    config = appConfig.infra.redis ?: configuration.build()

    initClient(config)

    if (config.subscribe != null) {
        initSubscriber(config, logWriter)
    }

    pipeline.koin {
        modules(
            module(createdAtStart = true) {
                single { client }
                if (keyspaceNotificationListener != null)
                    single { keyspaceNotificationListener }
                KoinApplicationShutdownManager.register { shutdown() }
            }
        )
    }

    return feature
}
        
private fun initSubscriber(config: RedisConfig, logWriter: LogWriter) {
    logger.info("========== init Redis PubSub subscriber... ==========")
    try {
        subscribeDispatcher = CoroutineUtils.createDispatcher(subscribeDispatcherName, ThreadPoolConfig(fixedPoolSize = 1))
        subscribeClient = RedisClient(
            address = InetSocketAddress(config.host, config.port),
            password = config.password,
            maxConnections = 1,
            dispatcher = subscribeDispatcher, rootKeyPrefix = config.rootKeyPrefix
        )

        with(config.subscribe!!) {
            val redisPubSub = runBlocking {
                val subscriber = if (!patterns.isNullOrEmpty())
                    subscribeClient.psubscribe(*patterns.toTypedArray())
                else null

                if (!channels.isNullOrEmpty()) {
                    subscriber?.subscribe(*channels.toTypedArray()) ?: subscribeClient.subscribe(*channels.toTypedArray())
                } else subscriber
            }!!

            if (keyspaceNotification != null) {
                keyspaceNotificationListener = RedisKeyspaceNotificationListener(keyspaceNotification, redisPubSub, logWriter)
            }
        }
    } catch (e: Throwable) {
        throw InternalServerException(InfraResponseCode.REDIS_ERROR, "fail to init Redis PubSub subscriber", e)
    }
    logger.info("========== init Redis PubSub subscriber completed ==========")
}

RedisSessionStorage 透过 RedisKeyspaceNotificationListener 订阅 Session Key Expired 事件

昨天的文章 [Day 26] 实作 Ktor Session Authentication with Redis 有提到 SessionAuth Plugin 会建立 RedisSessionStorage 物件用来操作 Session 资料。初始化RedisSessionStorage 的时候会呼叫 subscribeSessionKeyExpired() 方法,把处理 session key expired 事件的 lambda 注册到 RedisKeyspaceNotificationListener 里面。

一旦收到 expired 事件通知,会从 key 也就是 sessionId 字串取出 userId,然後再执行 redis HDEL 指令删除 userId 清单中的 sessionId。这里为了简化实作把 userId 夹带在 sessionId 之中,如果不想这麽做,那就要自己产生另一个随机 id,并储存对应表於资料库。

点我连结到完整的 RedisSessionStorage 程序码

class RedisSessionStorage(
    private val sessionConfig: SessionConfig,
    private val redisClient: RedisClient,
    private val redisKeyspaceNotificationListener: RedisKeyspaceNotificationListener? = null,
    private val logWriter: LogWriter
) : SessionService {

    // Redis Key => sessionId -> session data
    private val sessionKeyPrefix: String = redisClient.rootKeyPrefix + ":session:"
    
    // Redis Key => userId -> sessionId List
    private val sessionIdsKeyPrefix: String = redisClient.rootKeyPrefix + ":sessionIds:"
    
    init {
        if (redisKeyspaceNotificationListener != null) {
            subscribeSessionKeyExpired()
        }
    }
    
    override suspend fun deleteSession(session: UserSession) {
        logger.debug("logout session: ${session.id}")

        val sessionKey = buildSessionKey(session)
        val sessionIdsKey = buildSessionIdKey(session.id.userId)

        redisClient.del(sessionKey)
        redisClient.hdel(sessionIdsKey, sessionKey)
    }

    private suspend fun setSession(session: UserSession, startTime: Instant, expireDuration: Duration?) {
        session.value.expireTime = expireDuration?.let { startTime.plus(it) }

        val sessionKey = buildSessionKey(session)
        val sessionIdsKey = buildSessionIdKey(session.id.userId)

        redisClient.set(
            sessionKey,
            json.encodeToString(UserSession.Value.serializer(), session.value),
            expireDuration?.toMillis()
        )
        redisClient.hset(
            sessionIdsKey, sessionKey,
            session.value.expireTime?.let { DateTimeUtils.UTC_DATE_TIME_FORMATTER.format(it) } ?: ""
        )
    }
    
    private fun subscribeSessionKeyExpired() {
        redisKeyspaceNotificationListener!!.subscribeKeyspaceNotification { notification ->
            if (notification.isKeyEvent && notification.event == "expired" && notification.key.startsWith(sessionKeyPrefix)) {
                logger.debug { "session key expired: ${notification.key}" }
                try {
                    val segments = notification.key.split(":")
                    val userId = UUID.fromString(segments[5])
                    val sessionKey = notification.key
                    redisClient.hdel(buildSessionIdKey(userId), sessionKey)
                } catch (e: Throwable) {
                    val errorMsg = "subscribeSessionKeyExpired error"
                    logger.error("$errorMsg => $notification", e)
                    logWriter.write(
                        ErrorLog.internal(
                            InternalServerException(
                                InfraResponseCode.REDIS_KEY_NOTIFICATION_ERROR, errorMsg, e,
                                mapOf("notification" to notification)
                            ),
                            "SessionService", "subscribeSessionKeyExpired"
                        )
                    )
                }
            }
        }
    }
}   

成果展示

下图是 Server 启动时的 log,最後一行是系统执行指令 psubscribe '__keyevent*__:expired' 向 Redis 订阅 Keyspace notifications 的回应资料 "psubscribe","__keyevent*__:expired",1

下图是当 session 过期时,收到 Redis 通知的 log

收到的原始资料是

"pmessage","__keyevent*__:expired","__keyevent@0__:expired","fanpoll-dev:session:ops:user:user:5711da3a-6096-46fc-bce9-2bf70af27d85:631ad1a0926235174f3b7601476cfff3"
``

<<:  Day20 - 使用Django进行自动化测试 (2)

>>:  框架在手,工作我有:MockK的简介?真的只是简介⋯⋯

[Day13] Node.js & NPM

从头到尾硬干一个网站是非常费功费时的事情,为了节省开发时间就会尽量引用现成的套件,随着引用的套件越来...

[DAY 5] _stm32f103c8t6开发板暂存器开发_控制MCU的GPIO High、Low范例

想走嵌入式系统开发这行必经的路,直接了解最底层怎麽运作的,Arduino底层也是这样运作的,只是Ar...

【Day 26】渲染备忘:Memo

React.memo React.memo 主要的作用是性能优化, 使用 memo 後,程序会将 r...

[从0到1] C#小乳牛 练成基础程序逻辑 Day 7 - 程序码收纳术 3种注解

回忆最美~ | 那还不快记下来Rrr | 把Code搓圆拍扁 🐄点此填写今日份随堂测验 ...

PHP 乱数产生介於 0 到 1 之间的浮点数

前言 有一次在串接合作厂商的 API 时,需要把他原本取 token 的 javascript 改写...