[Day 21] 使用 Coroutine SendChannel 处理非同步工作

系统除了即时接受及回应使用者请求,也需要执行各种非同步工作,例如背景排程及寄送讯息通知…等。在实作上,虽然我可以直接使用 CompletableFuture, Akka Actor, Coroutine Channel 的 API,将工作切换到另一个执行绪处理,但其实背後有许多实作细节需要考虑,例如 ThreadPool 参数调校、错误处理…等,我希望各种不同类型的非同步工作,能采用一致的方式处理这些非商业逻辑的细节。

实作目标

  • 为每种特定功能的非同步工作建立 XXXCoroutineActorXXXCoroutineActor 内部实作使用 Coroutine SendChannel (在此也可称为 Actor),而且不对外曝露 CoroutineDispatcher, CoroutineExceptionHandler, ThreadFactory …等实作细节,所以使用方式仅仅是呼叫 XXXCoroutineActor.send(message) 方法而已
  • ThreadPool 参数可由外部设定档设定,方便对每种非同步工作进行效能调校
  • 停止 Server 时会等待 channel 内的工作结束,达到 Graceful Shutdown

点击以下连结至实作程序码

CoroutineActor Configuration

每一种非同步工作都要在 application.conf 设定档设定 asyncExecutor,目前 asyncExecutor 的实作只有coroutineActorcoroutines 是想建立的 coroutine 数量,dispatcher 可设定 threadpool 参数,fixedPoolSizeminPoolSize, maxPoolSize, keepAliveTime 二择一设定。

asyncExecutor {
    coroutineActor {
        coroutines = 1
        dispatcher {
            fixedPoolSize = 1
            minPoolSize = 1
            maxPoolSize = 3
            keepAliveTime = 1000000000
        }
    }
}

下面是对应的 configuration data class,其中 ThreadPoolConfig 实作 validate() 检查设定值是否合法。

data class CoroutineActorConfig(val coroutines: Int = 1, val dispatcher: ThreadPoolConfig? = null) {

    fun validate() {
        dispatcher?.validate()
    }

    class Builder {

        var coroutines: Int = 1
        private var dispatcher: ThreadPoolConfig? = null

        fun dispatcher(block: ThreadPoolConfig.Builder.() -> Unit) {
            dispatcher = ThreadPoolConfig.Builder().apply(block).build()
        }

        fun build(): CoroutineActorConfig {
            return CoroutineActorConfig(coroutines, dispatcher).apply { validate() }
        }
    }
}

data class ThreadPoolConfig(
    val fixedPoolSize: Int? = 1,
    val minPoolSize: Int? = null,
    val maxPoolSize: Int? = null,
    val keepAliveTime: Long? = null
) : ValidateableConfig {

    fun isFixedThreadPool(): Boolean = fixedPoolSize != null && (minPoolSize == null && maxPoolSize == null && keepAliveTime == null)

    override fun validate() {
        require(
            (minPoolSize != null && maxPoolSize != null && keepAliveTime != null) ||
                    (minPoolSize == null && maxPoolSize == null && keepAliveTime == null)
        ) {
            "minPoolSize, maxPoolSize, keepAliveTime should be configured"
        }
    }

    class Builder {

        var fixedPoolSize: Int? = 1
        var minPoolSize: Int? = null
        var maxPoolSize: Int? = null
        var keepAliveTime: Long? = null

        fun build(): ThreadPoolConfig {
            return ThreadPoolConfig(fixedPoolSize, minPoolSize, maxPoolSize, keepAliveTime).apply { validate() }
        }
    }
}

为每种特定功能的非同步工作建立 CoroutineActor,以 DBAsyncTaskCoroutineActor 为例

Database Plugin 初始化 DBAsyncTaskCoroutineActor

Database Plugin 先从 Ktor 设定档读取 CoroutineActorConfig,再建立 DBAsyncTaskCoroutineActor,并注册至 Koin DI

override fun install(pipeline: Application, configure: Configuration.() -> Unit): DatabaseFeature {
    val logWriter = pipeline.get<LogWriter>()
    val config = appConfig.infra.database?.asyncExecutor ?: configuration.asyncExecutorConfig
    asyncExecutor = DBAsyncTaskCoroutineActor(config.coroutineActor, logWriter)
    pipeline.koin {
        modules(
            module(createdAtStart = true) {
                single { asyncExecutor }
            }
        )
    }
    
    KoinApplicationShutdownManager.register { asyncExecutor?.shutdown() }
}

DBAsyncTaskCoroutineActor 非同步写入资料至资料库

当使用者成功登入之後,以非同步方式记录使用者 App 装置资讯於资料库,所以我透过 Koin DI 拿到 DBAsyncTaskCoroutineActor 物件,然後执行 run 函式更新资料库的 user_device 资料表

dbAsyncTaskCoroutineActor.run("createUserDevice") {
    UserDeviceTable.insert(form.toCreateUserDeviceDTO())
}

实作 DBAsyncTaskCoroutineActor

以下是 DBAsyncTaskCoroutineActor 程序码,当我们呼叫 run 函式时,内部实作是建立一个 DBAsyncTask 丢到一个无关功能逻辑的 CoroutineActor 里面,然後从 coroutine channel 另一端取出後再执行 execute 函式,建立 DB Transaction 执行 DBAsyncTaskTransaction.() -> Any? lambda。

另一方面,当停止 Server 时,因为先前已经在 Database Plugin 注册 shutdown() 函式,所以会呼叫 channel.close(),等所有的 DBAsyncTask 执行完毕後再停止。详细实作 coroutine channel graceful stop 的作法,可参考我之前写的文章 [Day 10] Ktor Graceful Shutdown

class DBAsyncTaskCoroutineActor(
    coroutineActorConfig: CoroutineActorConfig,
    private val logWriter: LogWriter
) {

    private val logger = KotlinLogging.logger {}

    private val actorName = "DBAsyncTaskActor"

    private val actor: CoroutineActor<DBAsyncTask> = CoroutineActor(
        actorName, Channel.UNLIMITED,
        coroutineActorConfig, Dispatchers.IO,
        this::execute, null,
        logWriter
    )

    fun run(type: String, block: Transaction.() -> Any?): UUID {
        val task = DBAsyncTask(type, block)
        actor.sendToUnlimitedChannel(task, InfraResponseCode.DB_ASYNC_TASK_ERROR) // non-blocking by Channel.UNLIMITED
        return task.id
    }

    private fun execute(task: DBAsyncTask) {
        try {
            transaction {
                task.block(this)
            }
        } catch (e: Throwable) {
            val errorMsg = "$actorName execute error"
            logger.error("errorMsg => $task", e)
            logWriter.write(
                ErrorLog.internal(
                    InternalServerException(
                        InfraResponseCode.DB_ASYNC_TASK_ERROR, errorMsg, e,
                        mapOf("taskId" to task.id, "taskType" to task.type)
                    ),
                    actorName, task.id.toString()
                )
            )
        }
    }

    internal fun shutdown() {
        actor.close()
    }
}

CoroutineActor 内部无关功能逻辑的实作细节

建立 CoroutineScope 包含 CorotuineDispatcher 及 CoroutineExceptionHandler

  • 根据外部设定档的 CoroutineActorConfig 设定值建立 CorotuineDispatcher
  • 目前实作的 CoroutineExceptionHandler 只有印出错误讯息,更积极的作法是储存失败的工作资料,未来进行 Retry 动作

其它参数的意义

  • capacity coroutine channel 的容量大小,如果工作数量超过上限的话,呼叫 send() 方法时必须等待
  • processBlock 是执行工作的 lambda
  • produceBlock 是传入工作的 lambda,会在 CoroutineActor 建立後立即呼叫
class CoroutineActor<T : IdentifiableObject<*>>(
    val name: String,
    capacity: Int,
    coroutineActorConfig: CoroutineActorConfig,
    defaultCoroutineDispatcher: CoroutineDispatcher,
    processBlock: suspend (T) -> Unit,
    produceBlock: (CoroutineScope.(SendChannel<T>) -> Unit)? = null,
    private val logWriter: LogWriter? = null
) {

    private val logger = KotlinLogging.logger {}

    private val dispatcher: CoroutineDispatcher
    private val channel: SendChannel<T>
    private val coroutineScope: CoroutineScope

    init {
        logger.info("========== init coroutine actor $name ... ==========")
        try {
            dispatcher = if (coroutineActorConfig.dispatcher != null)
                CoroutineUtils.createDispatcher(name, coroutineActorConfig.dispatcher)
            else defaultCoroutineDispatcher

            // Note: block should catch all exceptions in most cases
            val exceptionHandler = CoroutineExceptionHandler { ctx, e ->
                logger.error("coroutine actor uncaught exception => $ctx", e)
            }
            val context = dispatcher + exceptionHandler
            coroutineScope = CoroutineScope(context)
            channel = CoroutineUtils.createActor(
                name, capacity, coroutineActorConfig.coroutines,
                coroutineScope, processBlock
            )

            produceBlock?.invoke(coroutineScope, channel)
        } catch (e: Throwable) {
            throw InternalServerException(InfraResponseCode.LOG_ERROR, "fail to init coroutine actor $name", e)
        }
        logger.info("========== init coroutine actor $name completed ==========")
    }

    // call channel.close() to wait task completed when shutdown server (don't call channel.cancel())
    fun close() {
        logger.info("coroutine actor $name close ...")
        CoroutineUtils.closeChannel(name, channel)
        coroutineScope.cancel(name)
        if (dispatcher is ExecutorCoroutineDispatcher) {
            CoroutineUtils.closeDispatcher(name, dispatcher)
        }
        logger.info("coroutine actor $name closed")
    }
}

下面是 CoroutineUtils.createActor() 的程序码,在这里没有使用 CoroutineScope.actor() 函式建立 SendChannel 是因为目前 actor 的实作有点简单,无法满足我的需求,而且官方 API 文件也有加注 This API will become obsolete in future updates with introduction of complex actors. See issue #87

此外,我为每个 coroutine 指定 CoroutineName 方便 Trace & Debug。记得要一并设定 JVM Option -Dkotlinx.coroutines.debug

fun <E> createActor(
    name: String, capacity: Int, coroutines: Int,
    scope: CoroutineScope,
    block: suspend (E) -> Unit
): SendChannel<E> {
    require(coroutines > 0)

    val channel = Channel<E>(capacity)
    repeat(coroutines) {
        scope.launch(CoroutineName("$name-(${it + 1})")) {
            for (e in channel) {
                logger.debug { coroutineContext }
                block(e)
            }
        }
    }
    return channel
}

实作 ThreadFactory 指定 Thread 名称及 UncaughtExceptionHandler

与 coroutine 同样的做法,建议 ThreadFactory 建立 thread 时也指定 thread name 及 uncaughtExceptionHandler。然後再使用 ExecutorService 的 asCoroutineDispatcher() 方法,把 Java ExecutorService 转为 Coroutine Dispatcher

object ThreadPoolUtils {

    private val logger = KotlinLogging.logger {}

    fun createThreadPoolExecutor(
        threadNamePrefix: String,
        config: ThreadPoolConfig,
        handler: Thread.UncaughtExceptionHandler? = null
    ): ExecutorService {
        logger.info("init thread pool $threadNamePrefix ... $config")
        val factory = DefaultThreadFactory(threadNamePrefix, handler)

        return if (config.isFixedThreadPool())
            Executors.newFixedThreadPool(config.fixedPoolSize!!, factory)
        else
            ThreadPoolExecutor(
                config.minPoolSize!!, config.maxPoolSize!!,
                config.keepAliveTime!!, TimeUnit.SECONDS,
                LinkedBlockingQueue(), factory
            )
    }

    private class DefaultThreadFactory(
        private val namePrefix: String,
        val handler: Thread.UncaughtExceptionHandler? = null
    ) : ThreadFactory {

        private val backingThreadFactory: ThreadFactory = Executors.defaultThreadFactory()

        override fun newThread(r: Runnable): Thread {
            val thread = backingThreadFactory.newThread(r)
            thread.name = "$namePrefix-${thread.name}"
            if (handler != null)
                thread.uncaughtExceptionHandler = handler
            else
                thread.setUncaughtExceptionHandler { t, e ->
                    logger.error("Thread Uncaught Error => ${t.name}", e)
                }
            return thread
        }
    }
}

传送工作给 CoroutineActor

建立及初始化 CoroutineActor 之後,接下来就是要传送工作给 coroutine channel。实务上要依工作种类及情境决定 channel buffer 的大小,避免累积太多工作耗光记忆体,不过我在此假设 Server 记忆体很大,所以预设值给定 Channel.UNLIMITED,因此虽然是呼叫 trySendBlocking,但实际上 channel 不会满,所以可视为 non-blocking 操作。

在这里要注意 channel 是否已经被 close,如果已被 close,此时使用 LogWriter 先把未完成工作的 ErrorLog 记录下来,未来再加入 Retry 机制。

// suspending
suspend fun send(message: T) {
    channel.send(message)
}
    
// not a suspend function and non-blocking due to Channel.UNLIMITED
fun sendToUnlimitedChannel(message: T, errorCode: ResponseCode) {
    val result = channel.trySendBlocking(message)
    if (result.isFailure) {
        var errorMsg = ""
        val e = result.exceptionOrNull()
        if (result.isClosed) {
            errorMsg = "$name is closed"
            logger.warn("$errorMsg => $message")
        } else {
            errorMsg = "$name unexpected error" // we never call channel.cancel()
            logger.error("$errorMsg => $message", e)
        }
        if (errorCode != InfraResponseCode.LOG_ERROR) {
            logWriter?.write(
                ErrorLog.internal(
                    InternalServerException(errorCode, errorMsg, e, mapOf("message" to message)),
                    name, message.id.toString()
                )
            )
            // TODO: persistence unDeliveredMessage and retry later
        }
    }
}

<<:  [ Day 11] Forensics 小暖身

>>:  # Day 17 Physical Memory Model (二)

漏洞管理(Vulnerability Management)

由於管理是实现一个或多个目标的系统方法,我根据维基百科和NIST CSRC 术语表中的定义定义漏洞...

Android Studio初学笔记-Day14-Switch和Toggle Button

Switch和Toggle Button 今天要讲两个简单的元件,在还没碰到这两个元件之前,想要做到...

Day 17 : 模型前的资料处理 (1)

虽然好的模型和参数可以提高成效,但通常最关键还是资料本身。基本上资料的品质决定了八成以上模型的成效,...

CDN加速的应用场景一览

CDN加速应用场景都有哪些? 一、网站加速 CDN加速的应用场景一览 适用於有加速需求的网站,包括门...

[Day 23] 自定义 ColumnType, Operator, Expression 扩展 Exposed Query DSL API

ORM 框架可以让开发者专注於物件的 CRUD 操作,不必直接思考 SQL 要怎麽写。如果是新增、修...