[Day 10] 实作 Ktor Graceful Shutdown

各种 Graceful Shutdown 的作法

一个稳定的服务必须要考虑当停止服务时,已经接受的 request 必须要处理完才行,否则轻则使用者会收到服务器错误,观感不佳,重则造成资料遗失或不一致。如果是微服务架构,那麽更是无法简单地 rollback 恢复资料。

一般 graceful shutdown 作法是 Server 接收到 stop signal 之後,就不再接收 request,然後等待一段时间消化进行中的 request 再停止,如果超过时间则强制关闭。通常 Web 框架会提供 Shutdown hook 机制,当 Server 停止的时候,先呼叫你实作的函式再停止,我们可以在这个函式里关闭开启的资源,例如关闭 Database, Redis 的连线。

除了关闭资源之外,等待所有非同步工作处理完毕也是必须考虑的。例如使用者当前的操作需要寄送 email 通知,我们通常会把寄送动作丢到另一个执行绪处理,藉此加快回应速度。那麽当 Server 停止的时候,我们要如何等待这些执行绪做完所有工作了呢? 在此举3个不同解决方案的例子

  • Spring Boot 提供 ThreadPoolTaskExecutor 实作,我们可以设定 setWaitForTasksToCompleteOnShutdown(true)setAwaitTerminationSeconds(30) 阻塞等待 30 秒後,再继续执行停止 server 的工作。

  • Akka Actor 提供 gracefulStop 函式,最久等待5秒处理剩余的工作。

    import static akka.pattern.Patterns.gracefulStop;
    import akka.pattern.AskTimeoutException;
    import java.util.concurrent.CompletionStage;
    
    try {
      CompletionStage<Boolean> stopped =
          gracefulStop(actorRef, Duration.ofSeconds(5), Manager.SHUTDOWN);
      stopped.toCompletableFuture().get(6, TimeUnit.SECONDS);
      // the actor has been stopped
    } catch (AskTimeoutException e) {
      // the actor wasn't stopped within 5 seconds
    }
    
  • Coroutine Channel 可以参考 Roman Elizarov 大大在 StackOverflow 回答的作法,透过呼叫 close() 传送特别的 close token 到 queue 里面,一旦 channel 接收到这种讯息就会立即停止接收新的讯息。另一边 ReceiveChannel 处理完最後一个讯息之後,填入 CompletableDeferred 结果值通知 SendChannel。最後建议可以在 withTimeout 里面呼叫 await() 设定最久等待时间

    val lastProcessed = CompletableDeferred<Message?>() 
    val actor = actor<Message> {
        var last: Message? = null
        try {
            for (msg in channel) {
                // process message
                last = msg
            }
        } finally {
            // report the last processed message via back channel
            lastProcessed.complete(last)
        }
    }
    
    actor.close()
    val last = lastProcessed.await() // receive last processed message
    

接下来说明我是如何实作 Ktor graceful shutdown

Ktor ShutDownUrl Plugin

首先我们必须先安装 ShutDownUrl Plugin,设定 shutdown endpoint 让我们呼叫触发 shutdown

ktor {
    deployment {
        shutdown.url = "/my/shutdown/path"
    }
}

这个 endpoint 没有办法设定 authentication,为了安全上的考量,我在 url 加上一长串乱数当作 key,而这个乱数可以透过环境变数设定
shutdown.url = "/ops/server/shutdown/"${?SERVER_SHUTDOWN_KEY}

然後部署时准备 stop.sh 档案执行 curl 指令呼叫 endpoint
curl "http://localhost:$PORT/ops/server/shutdown/$SERVER_SHUTDOWN_KEY"

Ktor ApplicationEvent

当 ShutDownUrl Plugin 执行 shutdown 时,会触发 Ktor ApplicationEvent 的 ApplicationStopPreparing 事件,所以我们可以事先订阅此事件,把想要执行的程序码实作为 lambda 传入

fun stopPreparing(application: Application) { log("Application stopPreparing: $it") }

application.environment.monitor.subscribe(ApplicationStopPreparing, ::stopPreparing) // subscribe

不过因为我有导入 Koin DI,而且 Koin 也有整合 Ktor ApplicationEvent,所以我改订阅 Koin 的 KoinApplicationStopPreparing 事件

以下是 Shutdown 时的各个 ApplicationEvent 的执行顺序
Ktor ApplicationStopPreparing → KoinApplicationStopPreparing → KoinApplicationStopped → Ktor ApplicationStopping → Ktor ApplicationStopped

现在我们已经知道可以透过 subscribe ApplicationEvent,让 Ktor 执行我们的 shutdown 程序码,那麽我们应该要在那里呼叫 subscribe 呢?

Ktor Plugins Graceful Shutdown

我们是透过安装 Ktor Plugin 来增加功能,然後在 Plugin 的 install function 里面进行资源初始化动作,例如 Server 启动时,Redis Plugin 会与 Redis 建立连线,然後停止 Server 时,我们要呼叫 RedisClient 的 quit() 方法中断连线,那要怎麽呼叫 quit() 呢? 我们可以在建立 RedisClient 物件之後,就呼叫 subscribe 方法把呼叫 quit() 的 lambda 传入 KoinApplicationStopPreparing event 的 EventHandler

override fun install(pipeline: Application, configure: Configuration.() -> Unit): RedisFeature {
    //以上省略
   initClient(config)
   application.environment.monitor.subscribe(KoinApplicationStopPreparing) {
        runBlocking {
            logger.info("close redis connection...")
            client.quit()
            logger.info("redis connection closed")
        }
    }
}

同样地,我们所有实作的 Plugin 也都采用此作法,例如 Database Plugin 关闭 database connection pool,Logging Plugin 呼叫 coroutine channel 的 close() 方法,等待剩下的 log 都写入到 AWS Kinesis Stream 之後再停止。

当 Server 停止时,Ktor 会逐一呼叫 KoinApplicationStopPreparing 之中,所有在 Plugin 传入的 EventHandler lambda。不过在这里要注意呼叫的顺序! 例如我希望系统能在一启动 Server 就能开始写入 log,然後停止 Server 时,LogWriter coroutine channel 是最後才被 close,确保系统运行时的所有重要 log 都能被记录下来,所以此时要考虑多个 Plugin 的安装顺序,甚至 depenency 关系。

Ktor ApplicationEvent 的内部实作是把所有 EventHandler 储存在一个 List 物件里面,所以会按照 Plugin 的安装顺序逐一呼叫,但我希望是相反的顺序呼叫,所以我实作了 KoinApplicationShutdownManager,在安装所有 Plugin 之後呼叫 complete() 方法,以相反的顺序 subscribe KoinApplicationStopPreparing

fun Application.main() {
    install(LoggingFeature)
    install(DatabaseFeature)
    install(RedisFeature)
    //省略... 
    KoinApplicationShutdownManager.complete(environment)
}

object KoinApplicationShutdownManager {

    private val tasks: MutableList<EventHandler<KoinApplication>> = mutableListOf()

    fun register(handler: EventHandler<KoinApplication>) {
        tasks += handler
    }

    fun complete(applicationEnvironment: ApplicationEnvironment) {
        tasks.asReversed().forEach {
            applicationEnvironment.monitor.subscribe(KoinApplicationStopPreparing, it)
        }
    }
}

到目前为止,我已经说明了 Ktor Plugin 的安装到最後的 graceful shutdown,明天说明我如何为 Ktor 加上 i18n 机制


<<:  [Day 02] 建立开发环境,做好行前准备

>>:  2021-Day10. 第一印象很重要!!从「加入群组」时,就建立良好关系:Line加群组欢迎讯息实作教学

Day 12 - Length of Last Word

大家好,我是毛毛。ヾ(´∀ ˋ)ノ 废话不多说开始今天的解题Day~ 58. Length of L...

如何用 AppFollow 做关键字研究

在 AppFollow 上查目标市场的 auto suggestion 找在一些大关键字中排名比较...

Day22-pytorch(5)简单示范regression模型

先import各种会用到的套件 使用sklearn的datasets套件建立要regression的...

【在厨房想30天的演算法】Day 15 演算法 : 排序 sort II 堆积、合并、快速

Aloha!又是我少女人妻终於来到第 15 天了~不知不觉就过了一半了,大家有听过跑者愉悦理论吗,就...

Day-14 Disk很大,你忍一下

Disk很大,你忍一下 tags: IT铁人 小故事 既然这篇要讲硬碟,先问各位一下,电脑一般来说我...