系统除了即时接受及回应使用者请求,也需要执行各种非同步工作,例如背景排程及寄送讯息通知…等。在实作上,虽然我可以直接使用 CompletableFuture, Akka Actor, Coroutine Channel 的 API,将工作切换到另一个执行绪处理,但其实背後有许多实作细节需要考虑,例如 ThreadPool 参数调校、错误处理…等,我希望各种不同类型的非同步工作,能采用一致的方式处理这些非商业逻辑的细节。
XXXCoroutineActor
,XXXCoroutineActor
内部实作使用 Coroutine SendChannel (在此也可称为 Actor),而且不对外曝露 CoroutineDispatcher
, CoroutineExceptionHandler
, ThreadFactory
…等实作细节,所以使用方式仅仅是呼叫 XXXCoroutineActor.send(message)
方法而已点击以下连结至实作程序码
每一种非同步工作都要在 application.conf 设定档设定 asyncExecutor
,目前 asyncExecutor 的实作只有coroutineActor
。coroutines
是想建立的 coroutine 数量,dispatcher 可设定 threadpool 参数,fixedPoolSize
或 minPoolSize
, maxPoolSize
, keepAliveTime
二择一设定。
asyncExecutor {
coroutineActor {
coroutines = 1
dispatcher {
fixedPoolSize = 1
minPoolSize = 1
maxPoolSize = 3
keepAliveTime = 1000000000
}
}
}
下面是对应的 configuration data class,其中 ThreadPoolConfig
实作 validate()
检查设定值是否合法。
data class CoroutineActorConfig(val coroutines: Int = 1, val dispatcher: ThreadPoolConfig? = null) {
fun validate() {
dispatcher?.validate()
}
class Builder {
var coroutines: Int = 1
private var dispatcher: ThreadPoolConfig? = null
fun dispatcher(block: ThreadPoolConfig.Builder.() -> Unit) {
dispatcher = ThreadPoolConfig.Builder().apply(block).build()
}
fun build(): CoroutineActorConfig {
return CoroutineActorConfig(coroutines, dispatcher).apply { validate() }
}
}
}
data class ThreadPoolConfig(
val fixedPoolSize: Int? = 1,
val minPoolSize: Int? = null,
val maxPoolSize: Int? = null,
val keepAliveTime: Long? = null
) : ValidateableConfig {
fun isFixedThreadPool(): Boolean = fixedPoolSize != null && (minPoolSize == null && maxPoolSize == null && keepAliveTime == null)
override fun validate() {
require(
(minPoolSize != null && maxPoolSize != null && keepAliveTime != null) ||
(minPoolSize == null && maxPoolSize == null && keepAliveTime == null)
) {
"minPoolSize, maxPoolSize, keepAliveTime should be configured"
}
}
class Builder {
var fixedPoolSize: Int? = 1
var minPoolSize: Int? = null
var maxPoolSize: Int? = null
var keepAliveTime: Long? = null
fun build(): ThreadPoolConfig {
return ThreadPoolConfig(fixedPoolSize, minPoolSize, maxPoolSize, keepAliveTime).apply { validate() }
}
}
}
Database Plugin 先从 Ktor 设定档读取 CoroutineActorConfig
,再建立 DBAsyncTaskCoroutineActor
,并注册至 Koin DI
override fun install(pipeline: Application, configure: Configuration.() -> Unit): DatabaseFeature {
val logWriter = pipeline.get<LogWriter>()
val config = appConfig.infra.database?.asyncExecutor ?: configuration.asyncExecutorConfig
asyncExecutor = DBAsyncTaskCoroutineActor(config.coroutineActor, logWriter)
pipeline.koin {
modules(
module(createdAtStart = true) {
single { asyncExecutor }
}
)
}
KoinApplicationShutdownManager.register { asyncExecutor?.shutdown() }
}
当使用者成功登入之後,以非同步方式记录使用者 App 装置资讯於资料库,所以我透过 Koin DI 拿到 DBAsyncTaskCoroutineActor
物件,然後执行 run
函式更新资料库的 user_device 资料表
dbAsyncTaskCoroutineActor.run("createUserDevice") {
UserDeviceTable.insert(form.toCreateUserDeviceDTO())
}
以下是 DBAsyncTaskCoroutineActor 程序码,当我们呼叫 run
函式时,内部实作是建立一个 DBAsyncTask
丢到一个无关功能逻辑的 CoroutineActor 里面,然後从 coroutine channel 另一端取出後再执行 execute
函式,建立 DB Transaction 执行 DBAsyncTask
的 Transaction.() -> Any?
lambda。
另一方面,当停止 Server 时,因为先前已经在 Database Plugin 注册 shutdown()
函式,所以会呼叫 channel.close()
,等所有的 DBAsyncTask
执行完毕後再停止。详细实作 coroutine channel graceful stop 的作法,可参考我之前写的文章 [Day 10] Ktor Graceful Shutdown
class DBAsyncTaskCoroutineActor(
coroutineActorConfig: CoroutineActorConfig,
private val logWriter: LogWriter
) {
private val logger = KotlinLogging.logger {}
private val actorName = "DBAsyncTaskActor"
private val actor: CoroutineActor<DBAsyncTask> = CoroutineActor(
actorName, Channel.UNLIMITED,
coroutineActorConfig, Dispatchers.IO,
this::execute, null,
logWriter
)
fun run(type: String, block: Transaction.() -> Any?): UUID {
val task = DBAsyncTask(type, block)
actor.sendToUnlimitedChannel(task, InfraResponseCode.DB_ASYNC_TASK_ERROR) // non-blocking by Channel.UNLIMITED
return task.id
}
private fun execute(task: DBAsyncTask) {
try {
transaction {
task.block(this)
}
} catch (e: Throwable) {
val errorMsg = "$actorName execute error"
logger.error("errorMsg => $task", e)
logWriter.write(
ErrorLog.internal(
InternalServerException(
InfraResponseCode.DB_ASYNC_TASK_ERROR, errorMsg, e,
mapOf("taskId" to task.id, "taskType" to task.type)
),
actorName, task.id.toString()
)
)
}
}
internal fun shutdown() {
actor.close()
}
}
建立 CoroutineScope 包含 CorotuineDispatcher 及 CoroutineExceptionHandler
其它参数的意义
capacity
coroutine channel 的容量大小,如果工作数量超过上限的话,呼叫 send()
方法时必须等待processBlock
是执行工作的 lambdaproduceBlock
是传入工作的 lambda,会在 CoroutineActor 建立後立即呼叫class CoroutineActor<T : IdentifiableObject<*>>(
val name: String,
capacity: Int,
coroutineActorConfig: CoroutineActorConfig,
defaultCoroutineDispatcher: CoroutineDispatcher,
processBlock: suspend (T) -> Unit,
produceBlock: (CoroutineScope.(SendChannel<T>) -> Unit)? = null,
private val logWriter: LogWriter? = null
) {
private val logger = KotlinLogging.logger {}
private val dispatcher: CoroutineDispatcher
private val channel: SendChannel<T>
private val coroutineScope: CoroutineScope
init {
logger.info("========== init coroutine actor $name ... ==========")
try {
dispatcher = if (coroutineActorConfig.dispatcher != null)
CoroutineUtils.createDispatcher(name, coroutineActorConfig.dispatcher)
else defaultCoroutineDispatcher
// Note: block should catch all exceptions in most cases
val exceptionHandler = CoroutineExceptionHandler { ctx, e ->
logger.error("coroutine actor uncaught exception => $ctx", e)
}
val context = dispatcher + exceptionHandler
coroutineScope = CoroutineScope(context)
channel = CoroutineUtils.createActor(
name, capacity, coroutineActorConfig.coroutines,
coroutineScope, processBlock
)
produceBlock?.invoke(coroutineScope, channel)
} catch (e: Throwable) {
throw InternalServerException(InfraResponseCode.LOG_ERROR, "fail to init coroutine actor $name", e)
}
logger.info("========== init coroutine actor $name completed ==========")
}
// call channel.close() to wait task completed when shutdown server (don't call channel.cancel())
fun close() {
logger.info("coroutine actor $name close ...")
CoroutineUtils.closeChannel(name, channel)
coroutineScope.cancel(name)
if (dispatcher is ExecutorCoroutineDispatcher) {
CoroutineUtils.closeDispatcher(name, dispatcher)
}
logger.info("coroutine actor $name closed")
}
}
下面是 CoroutineUtils.createActor()
的程序码,在这里没有使用 CoroutineScope.actor()
函式建立 SendChannel 是因为目前 actor 的实作有点简单,无法满足我的需求,而且官方 API 文件也有加注 This API will become obsolete in future updates with introduction of complex actors.
See issue #87。
此外,我为每个 coroutine 指定 CoroutineName
方便 Trace & Debug。记得要一并设定 JVM Option -Dkotlinx.coroutines.debug
fun <E> createActor(
name: String, capacity: Int, coroutines: Int,
scope: CoroutineScope,
block: suspend (E) -> Unit
): SendChannel<E> {
require(coroutines > 0)
val channel = Channel<E>(capacity)
repeat(coroutines) {
scope.launch(CoroutineName("$name-(${it + 1})")) {
for (e in channel) {
logger.debug { coroutineContext }
block(e)
}
}
}
return channel
}
与 coroutine 同样的做法,建议 ThreadFactory
建立 thread 时也指定 thread name 及 uncaughtExceptionHandler。然後再使用 ExecutorService 的 asCoroutineDispatcher()
方法,把 Java ExecutorService 转为 Coroutine Dispatcher
object ThreadPoolUtils {
private val logger = KotlinLogging.logger {}
fun createThreadPoolExecutor(
threadNamePrefix: String,
config: ThreadPoolConfig,
handler: Thread.UncaughtExceptionHandler? = null
): ExecutorService {
logger.info("init thread pool $threadNamePrefix ... $config")
val factory = DefaultThreadFactory(threadNamePrefix, handler)
return if (config.isFixedThreadPool())
Executors.newFixedThreadPool(config.fixedPoolSize!!, factory)
else
ThreadPoolExecutor(
config.minPoolSize!!, config.maxPoolSize!!,
config.keepAliveTime!!, TimeUnit.SECONDS,
LinkedBlockingQueue(), factory
)
}
private class DefaultThreadFactory(
private val namePrefix: String,
val handler: Thread.UncaughtExceptionHandler? = null
) : ThreadFactory {
private val backingThreadFactory: ThreadFactory = Executors.defaultThreadFactory()
override fun newThread(r: Runnable): Thread {
val thread = backingThreadFactory.newThread(r)
thread.name = "$namePrefix-${thread.name}"
if (handler != null)
thread.uncaughtExceptionHandler = handler
else
thread.setUncaughtExceptionHandler { t, e ->
logger.error("Thread Uncaught Error => ${t.name}", e)
}
return thread
}
}
}
建立及初始化 CoroutineActor 之後,接下来就是要传送工作给 coroutine channel。实务上要依工作种类及情境决定 channel buffer 的大小,避免累积太多工作耗光记忆体,不过我在此假设 Server 记忆体很大,所以预设值给定 Channel.UNLIMITED
,因此虽然是呼叫 trySendBlocking
,但实际上 channel 不会满,所以可视为 non-blocking 操作。
在这里要注意 channel 是否已经被 close,如果已被 close,此时使用 LogWriter 先把未完成工作的 ErrorLog 记录下来,未来再加入 Retry 机制。
// suspending
suspend fun send(message: T) {
channel.send(message)
}
// not a suspend function and non-blocking due to Channel.UNLIMITED
fun sendToUnlimitedChannel(message: T, errorCode: ResponseCode) {
val result = channel.trySendBlocking(message)
if (result.isFailure) {
var errorMsg = ""
val e = result.exceptionOrNull()
if (result.isClosed) {
errorMsg = "$name is closed"
logger.warn("$errorMsg => $message")
} else {
errorMsg = "$name unexpected error" // we never call channel.cancel()
logger.error("$errorMsg => $message", e)
}
if (errorCode != InfraResponseCode.LOG_ERROR) {
logWriter?.write(
ErrorLog.internal(
InternalServerException(errorCode, errorMsg, e, mapOf("message" to message)),
name, message.id.toString()
)
)
// TODO: persistence unDeliveredMessage and retry later
}
}
}
>>: # Day 17 Physical Memory Model (二)
由於管理是实现一个或多个目标的系统方法,我根据维基百科和NIST CSRC 术语表中的定义定义漏洞...
Switch和Toggle Button 今天要讲两个简单的元件,在还没碰到这两个元件之前,想要做到...
虽然好的模型和参数可以提高成效,但通常最关键还是资料本身。基本上资料的品质决定了八成以上模型的成效,...
CDN加速应用场景都有哪些? 一、网站加速 CDN加速的应用场景一览 适用於有加速需求的网站,包括门...
ORM 框架可以让开发者专注於物件的 CRUD 操作,不必直接思考 SQL 要怎麽写。如果是新增、修...