[Day 20] 实作 Logging Plugin 建立系统 Logging 机制

系统的 Log 除了基本的 Request Log 及 Error Log 之外,特定的功能也会有记录 Log 的需求,例如使用者登入时,需要记录来源 IP、装置及登入失败的原因;寄送讯息通知时,记录讯息类型、寄送方式(channel)及第三方服务的回应来判断是否成功寄送。即使这些 Log 的资料栏位及写入时机不相同,但我们仍需要有统一的方式处理。

实作目标

  • 不使用字串型态写入 Log,而是定义自己的 LogMessage 资料类别,再根据写入的目的地,转换为特定的资料格式,例如 JSON
  • 写入 Log 的目的地不限於档案,可以自行实作 LogWriter 写到任意目的地,例如资料库,或是转为 JSON 写入到 AWS Kinesis Data Firehose
  • 根据 LogMessageLogType 决定对应的 LogWriter 写入 log 到特定的目的地
  • 当实作新功能有记录 Log 的需求时,可以沿用既有的 LogWriter,只要实作 LogMessage 资料类别就好
  • 支援非同步写入 Log

那为什麽不使用 Logging Framework 就好呢? 例如 Logback

  • 我们在使用 Logging Library 写 log 的时候都是传入字串,写到档案之後再用 Logstash 剖析格式转换为结构化资料,然後再传送到 ElasticSearch。但是这部分我想要自己实作 LogMessage 资料类别,自定义 log 资料栏位,再自行转换各种资料格式,比较有弹性满足我的进阶需求
  • 我手边有 AWS EKK 的测试环境,但是 Logback 没有 Appender 可以使用,所以只能自己实作 AwsKinesisLogWriter 使用 Kinesis Firehose SDK 传送 JSON 到 ElasticaSearch (我没有在 EC2 安装 Kinesis Agent 转一手处理 log 资料)。AWS EKK 服务介绍可以看这篇文章 From ELK Stack to EKK: Aggregating and Analyzing Apache Logs with Amazon Elasticsearch Service, Amazon Kinesis, and Kibana

定义各种 Log 的父类别 LogMessage

每种功能的 Log 资料类别都要继承 LogMessage,至少包含以下3个栏位

  • occurAt => 事件发生时间
  • logType => 用来区分不同功能的 Log
  • logLevel => 可以做一些判断,决定要如何处理 log

各种 Log 资料类别的程序码在此

  • RequestLog (logType = request) => 系统收到 Http Request 的 log
  • ErrorLog (logType = error) => 系统错误 log
  • LoginLog (logType = login) => 使用者登入/登出 log
  • NotificationMessageLog (logType = notification_message) => 系统寄送讯息通知 log
abstract class LogMessage : IdentifiableObject<UUID>() {

    abstract val occurAt: Instant

    abstract val logType: String

    abstract val logLevel: LogLevel

    fun toJson(): JsonElement = json.encodeToJsonElement(this)
}

@Serializable
data class LoginLog(
    @Serializable(with = UUIDSerializer::class) val userId: UUID,
    val resultCode: LoginResultCode,
    @Serializable(with = InstantSerializer::class) override val occurAt: Instant,
    val project: String,
    val source: PrincipalSource,
    val tenantId: TenantId? = null,
    val clientId: String? = null,
    val clientVersion: String?,
    val ip: String? = null,
    val sid: String? = null
) : LogMessage() {

    @Serializable(with = UUIDSerializer::class)
    override val id: UUID = UUID.randomUUID()

    override val logType: String = LOG_TYPE

    override val logLevel: LogLevel = Log_Level

    companion object {
        const val LOG_TYPE = "login"
        private val Log_Level = LogLevel.INFO
    }
}

实作各种 LogWriter 写入 LogMessage 到不同目的地

目前 LogMessage 可以使用以下 LogWriter 写到不同目的地

interface LogWriter {

    fun write(message: LogMessage)

    fun shutdown() {}
}

实作 LogMessageDispatcher 根据 LogMessage 的 LogType 决定对应的 LogWriter

LogMessageDispatcher 也实作 LogWriter 介面,当呼叫 write(message: LogMessage) 方法时,会根据 LogMessage 的 LogType 找出事先已注册的 LogWriter,再呼叫 write(message: LogMessage) 方法写入 log

class LogMessageDispatcher(private val defaultLogWriter: LogWriter? = null) : LogWriter {

    private val logWriters: MutableMap<String, LogWriter> = mutableMapOf()

    fun register(logType: String, logWriter: LogWriter) {
        require(!logWriters.containsKey(logType))

        logWriters[logType] = logWriter
    }

    override fun write(message: LogMessage) {
        val logWriter = logWriters[message.logType] ?: defaultLogWriter ?: throw InternalServerException(
            InfraResponseCode.SERVER_CONFIG_ERROR, "logType ${message.logType} logWriter is not registered"
        )
        logWriter.write(message)
    }

    override fun shutdown() {
        logWriters.values.forEach { it.shutdown() }
    }
}

实作 LogMessageCoroutineActor 非同步写入 Log

这里使用 decorator pattern 的手法,让 LogMessageDispatcherLogMessageCoroutineActor 都实作 LogWriter 介面,所以呼叫 write(message: LogMessage) 方法时,会 delegate 到内部的 LogWriter,执行顺序会是 LogMessageCoroutineActor → LogMessageDispatcher → XXXLogWriterLogMessageCoroutineActor 内部是以 Coroutine SendChannel 达成非同步执行,更多 CoroutineActor 的实作细节,请参考 [Day 21] 使用 Coroutine SendChannel 处理非同步工作

class LogMessageCoroutineActor(
    coroutineActorConfig: CoroutineActorConfig,
    private val logWriter: LogWriter
) : LogWriter {

    private val logger = KotlinLogging.logger {}

    private val actorName = "LogWriterActor"

    private val actor: CoroutineActor<LogMessage> = CoroutineActor(
        actorName, Channel.UNLIMITED,
        coroutineActorConfig, Dispatchers.IO,
        this::execute
    )

    override fun write(message: LogMessage) {
        actor.sendToUnlimitedChannel(message, InfraResponseCode.LOG_ERROR) // non-blocking by Channel.UNLIMITED
    }

    private fun execute(message: LogMessage) {
        try {
            logWriter.write(message)
        } catch (e: Throwable) {
            logger.error("$actorName execute error => $message", e)
        }
    }

    override fun shutdown() {
        actor.close()
        logWriter.shutdown()
    }
}

实作 Logging Plugin

Logging Configuration

Logging Plugin 会根据 application.conf 设定档,初始化建立对应的 LogWriter,然後注册至 Koin DI。因为 Request Log 及 Error Log 是系统最基本的 LogType,所以我会在 Logging Plugin 里面进行设定。以下面的设定值为例,Request Log 会使用 AwsKinesisLogWriter,Error Log 则是使用 ErrorLogDBWriter

logging {
    request {
        enabled = true
        destination = "AwsKinesis" # File(default), Database, AwsKinesis
        includeHeaders = false
        includeQueryString = false
        includeResponseBody = false
        includeGetMethod = false
        excludePaths = ["/ops/sysstat/healthCheck"]
        excludeRequestBodyPaths = ["/login", "/myPassword"]
    }
    error {
        enabled = true
        destination = "Database" # File(default), Database, AwsKinesis
        // TODO integrate notification feature to notify OpsTeam by email, sms...
    }
    writer {
        awsKinesis {
            streamName = "logging"
            nettyClient {
                #maxConcurrency = 50 # => aws default value = 50
                #maxPendingConnectionAcquires = 10000 => aws default value = 10000
                #maxIdleConnectionTimeout = 60s => aws default value = 60s
            }
            threadPool {
                fixedPoolSize = 3
            }
        }
    }
    asyncExecutor {
        coroutineActor {
            coroutines = 1
            dispatcher {
                fixedPoolSize = 1
            }
        }
    }
}

初始化 LogMessageDispatcher、LogMessageCoroutineActor 注册至 Koin DI

下面是 Logging Plugin 的内部程序码,我会先建立 LogMessageDispatcher 物件,後续每种 LogType 的 LogWriter 要注册到 LogMessageDispatcher 里面,这样子写入 log 时就可以根据 LogMessage 的 LogType,取出对应的 LogWriter 执行写入动作。如果 LogType 没有设定对应的 LogWriter,就会使用预设的 FileLogWriter 写到档案。

pipeline.koin {
    modules(
        module(createdAtStart = true) {
            single { loggingConfig }

            val fileLogWriter = FileLogWriter()
            single { fileLogWriter }

            val logMessageDispatcher = LogMessageDispatcher(FileLogWriter())
            single { logMessageDispatcher }

            val awsKinesisLogWriter = loggingConfig.writer?.awsKinesis?.let {
                AwsKinesisLogWriter(it, serverConfig)
            }
            if (awsKinesisLogWriter != null)
                single { awsKinesisLogWriter }

            if (loggingConfig.request.enabled) {
                val requestLogWriter = when (loggingConfig.request.destination) {
                    LogDestination.File -> fileLogWriter
                    LogDestination.Database -> RequestLogDBWriter()
                    LogDestination.AwsKinesis -> awsKinesisLogWriter ?: throw InternalServerException(
                        InfraResponseCode.SERVER_CONFIG_ERROR, "AwsKinesisLogWriter is not configured"
                    )
                }
                logMessageDispatcher.register(RequestLog.LOG_TYPE, requestLogWriter)
            }
            if (loggingConfig.error.enabled) {
                val errorLogWriter = when (loggingConfig.error.destination) {
                    LogDestination.File -> fileLogWriter
                    LogDestination.Database -> ErrorLogDBWriter()
                    LogDestination.AwsKinesis -> awsKinesisLogWriter ?: throw InternalServerException(
                        InfraResponseCode.SERVER_CONFIG_ERROR, "kinesisLogWriter is not configured"
                    )
                }
                logMessageDispatcher.register(ErrorLog.LOG_TYPE, errorLogWriter)
            }

            val logWriter = loggingConfig.asyncExecutor?.let {
                LogMessageCoroutineActor(it.coroutineActor, logMessageDispatcher)
            } ?: logMessageDispatcher

            single { logWriter }

            KoinApplicationShutdownManager.register { logWriter.shutdown() }
        }
    )
}

SessionAuth Plugin 注册 LoginLog 的 LogWriter 至 LogMessageDispatcher 范例

先安装 Logging Plugin 初始化 LogMessageDispatcher 之後,SessionAuth Plugin 就可以透过 Koin DI 取出 LogMessageDispatcher 物件,再注册 LoginLog 使用 AwsKinesisLogWriter。当使用者登入、登出时,就可以使用 AwsKinesisLogWriter 写入 LoginLog。

sessionAuth {
    storageType = "Redis" # Redis
    redisKeyExpiredNotification = true
    session {
        expireDuration = 1d
        extendDuration = 15m
    }
    logging {
        enabled = true
        destination = "AwsKinesis" # File(default), Database, AwsKinesis
    }
}
val loginLogWriter = when (authConfig.logging.destination) {
    LogDestination.File -> pipeline.get<FileLogWriter>()
    LogDestination.Database -> LoginLogDBWriter()
    LogDestination.AwsKinesis -> pipeline.get<AwsKinesisLogWriter>()
}
val logMessageDispatcher = pipeline.get<LogMessageDispatcher>()
logMessageDispatcher.register(LoginLog.LOG_TYPE, loginLogWriter)

Global Exception Handler 写入 ErrorLog 范例

系统错误可分为2种,一种是处理外部请求的错误,我们可以在 Global Exception Handler ,先透过 Koin DI 取得 LogWriter,再把 Exception 转为 ErrorLog 物件写入。

install(StatusPages) {
    val loggingConfig = get<LoggingConfig>()
    val logWriter = get<LogWriter>()
    val responseCreator = get<I18nResponseCreator>()

    exception<Throwable> { cause ->
        val e = ExceptionUtils.wrapException(cause)
        ExceptionUtils.writeLogToFile(e, call)
        
        if (loggingConfig.error.enabled) {
            logWriter.write(ErrorLog.request(e, call))
        }
        
        val errorResponse = responseCreator.createErrorResponse(e, call)
        call.respond(errorResponse)
    }
}

另一种情况是系统内部背景排程、执行非同步工作的错误。例如 DBAsyncTaskCoroutineActor 非同步写入资料库时,我们要 catch Exception 转为 ErrorLog 物件再写入,差别在於这里没有外部请求的 ApplicationCall 物件

private fun execute(task: DBAsyncTask) {
    try {
        transaction {
            task.block(this)
        }
    } catch (e: Throwable) {
        val errorMsg = "$actorName execute error"
        logger.error("errorMsg => $task", e)
        logWriter.write(
            ErrorLog.internal(
                InternalServerException(
                    InfraResponseCode.DB_ASYNC_TASK_ERROR, errorMsg, e,
                    mapOf("taskId" to task.id, "taskType" to task.type)
                ),
                actorName, task.id.toString()
            )
        )
    }
}

结语

今天我说明了如何建立系统的 Logging 机制处理不同类型的 log,另一种类似的需求是系统也需要对各种非同步工作采用一致的方式处理。例如今天提到的 LogMessageCoroutineActor 非同步写入 log,还有 DBAsyncTaskCoroutineActor 非同步写入资料库。明天我会说明如何把 Coroutine SendChannel 包装成不同功能的 CoroutineActor 执行非同步工作。


<<:  透过机器学习审查合约书的4个优点

>>:  爬虫怎麽爬 从零开始的爬虫自学 DAY11 python列表基础篇

Day 24 - Single Number

大家好,我是毛毛。ヾ(´∀ ˋ)ノ 废话不多说开始今天的解题Day~ 136. Single Num...

[DAY13]给pod上个识别-Label

虾米系Label 当大家都完成第一步往k8s部署,第二步就是要来好好管理这些container,k8...

强型闯入DenoLand[28] - Oak 概念篇

强型闯入DenoLand[28] - Oak 概念篇 什麽是 Oak? Oak 是一款用来开发 h...

Day 27 - [Android APP] 05-API与物件

昨天讲的是 API 传递资料的流程,今天就来介绍怎麽把资料包装成物件,方便传递吧。 一样,会使用 J...

Day29: Picker controller

前言 今天要在 RecipeDetailView 中添加 Picker controller, 使其...