系统的 Log 除了基本的 Request Log 及 Error Log 之外,特定的功能也会有记录 Log 的需求,例如使用者登入时,需要记录来源 IP、装置及登入失败的原因;寄送讯息通知时,记录讯息类型、寄送方式(channel)及第三方服务的回应来判断是否成功寄送。即使这些 Log 的资料栏位及写入时机不相同,但我们仍需要有统一的方式处理。
LogMessage
资料类别,再根据写入的目的地,转换为特定的资料格式,例如 JSONLogWriter
写到任意目的地,例如资料库,或是转为 JSON 写入到 AWS Kinesis Data Firehose。LogMessage
的 LogType
决定对应的 LogWriter
写入 log 到特定的目的地LogWriter
,只要实作 LogMessage
资料类别就好那为什麽不使用 Logging Framework 就好呢? 例如 Logback
AwsKinesisLogWriter
使用 Kinesis Firehose SDK 传送 JSON 到 ElasticaSearch (我没有在 EC2 安装 Kinesis Agent 转一手处理 log 资料)。AWS EKK 服务介绍可以看这篇文章 From ELK Stack to EKK: Aggregating and Analyzing Apache Logs with Amazon Elasticsearch Service, Amazon Kinesis, and Kibana
每种功能的 Log 资料类别都要继承 LogMessage
,至少包含以下3个栏位
各种 Log 资料类别的程序码在此
abstract class LogMessage : IdentifiableObject<UUID>() {
abstract val occurAt: Instant
abstract val logType: String
abstract val logLevel: LogLevel
fun toJson(): JsonElement = json.encodeToJsonElement(this)
}
@Serializable
data class LoginLog(
@Serializable(with = UUIDSerializer::class) val userId: UUID,
val resultCode: LoginResultCode,
@Serializable(with = InstantSerializer::class) override val occurAt: Instant,
val project: String,
val source: PrincipalSource,
val tenantId: TenantId? = null,
val clientId: String? = null,
val clientVersion: String?,
val ip: String? = null,
val sid: String? = null
) : LogMessage() {
@Serializable(with = UUIDSerializer::class)
override val id: UUID = UUID.randomUUID()
override val logType: String = LOG_TYPE
override val logLevel: LogLevel = Log_Level
companion object {
const val LOG_TYPE = "login"
private val Log_Level = LogLevel.INFO
}
}
目前 LogMessage 可以使用以下 LogWriter 写到不同目的地
interface LogWriter {
fun write(message: LogMessage)
fun shutdown() {}
}
LogMessageDispatcher 也实作 LogWriter 介面,当呼叫 write(message: LogMessage)
方法时,会根据 LogMessage 的 LogType 找出事先已注册的 LogWriter,再呼叫 write(message: LogMessage)
方法写入 log
class LogMessageDispatcher(private val defaultLogWriter: LogWriter? = null) : LogWriter {
private val logWriters: MutableMap<String, LogWriter> = mutableMapOf()
fun register(logType: String, logWriter: LogWriter) {
require(!logWriters.containsKey(logType))
logWriters[logType] = logWriter
}
override fun write(message: LogMessage) {
val logWriter = logWriters[message.logType] ?: defaultLogWriter ?: throw InternalServerException(
InfraResponseCode.SERVER_CONFIG_ERROR, "logType ${message.logType} logWriter is not registered"
)
logWriter.write(message)
}
override fun shutdown() {
logWriters.values.forEach { it.shutdown() }
}
}
这里使用 decorator pattern 的手法,让 LogMessageDispatcher
及 LogMessageCoroutineActor
都实作 LogWriter 介面,所以呼叫 write(message: LogMessage)
方法时,会 delegate 到内部的 LogWriter,执行顺序会是 LogMessageCoroutineActor → LogMessageDispatcher → XXXLogWriter
。LogMessageCoroutineActor
内部是以 Coroutine SendChannel 达成非同步执行,更多 CoroutineActor 的实作细节,请参考 [Day 21] 使用 Coroutine SendChannel 处理非同步工作
class LogMessageCoroutineActor(
coroutineActorConfig: CoroutineActorConfig,
private val logWriter: LogWriter
) : LogWriter {
private val logger = KotlinLogging.logger {}
private val actorName = "LogWriterActor"
private val actor: CoroutineActor<LogMessage> = CoroutineActor(
actorName, Channel.UNLIMITED,
coroutineActorConfig, Dispatchers.IO,
this::execute
)
override fun write(message: LogMessage) {
actor.sendToUnlimitedChannel(message, InfraResponseCode.LOG_ERROR) // non-blocking by Channel.UNLIMITED
}
private fun execute(message: LogMessage) {
try {
logWriter.write(message)
} catch (e: Throwable) {
logger.error("$actorName execute error => $message", e)
}
}
override fun shutdown() {
actor.close()
logWriter.shutdown()
}
}
Logging Plugin 会根据 application.conf 设定档,初始化建立对应的 LogWriter,然後注册至 Koin DI。因为 Request Log 及 Error Log 是系统最基本的 LogType,所以我会在 Logging Plugin 里面进行设定。以下面的设定值为例,Request Log 会使用 AwsKinesisLogWriter
,Error Log 则是使用 ErrorLogDBWriter
logging {
request {
enabled = true
destination = "AwsKinesis" # File(default), Database, AwsKinesis
includeHeaders = false
includeQueryString = false
includeResponseBody = false
includeGetMethod = false
excludePaths = ["/ops/sysstat/healthCheck"]
excludeRequestBodyPaths = ["/login", "/myPassword"]
}
error {
enabled = true
destination = "Database" # File(default), Database, AwsKinesis
// TODO integrate notification feature to notify OpsTeam by email, sms...
}
writer {
awsKinesis {
streamName = "logging"
nettyClient {
#maxConcurrency = 50 # => aws default value = 50
#maxPendingConnectionAcquires = 10000 => aws default value = 10000
#maxIdleConnectionTimeout = 60s => aws default value = 60s
}
threadPool {
fixedPoolSize = 3
}
}
}
asyncExecutor {
coroutineActor {
coroutines = 1
dispatcher {
fixedPoolSize = 1
}
}
}
}
下面是 Logging Plugin 的内部程序码,我会先建立 LogMessageDispatcher
物件,後续每种 LogType 的 LogWriter 要注册到 LogMessageDispatcher 里面,这样子写入 log 时就可以根据 LogMessage 的 LogType,取出对应的 LogWriter 执行写入动作。如果 LogType 没有设定对应的 LogWriter,就会使用预设的 FileLogWriter 写到档案。
pipeline.koin {
modules(
module(createdAtStart = true) {
single { loggingConfig }
val fileLogWriter = FileLogWriter()
single { fileLogWriter }
val logMessageDispatcher = LogMessageDispatcher(FileLogWriter())
single { logMessageDispatcher }
val awsKinesisLogWriter = loggingConfig.writer?.awsKinesis?.let {
AwsKinesisLogWriter(it, serverConfig)
}
if (awsKinesisLogWriter != null)
single { awsKinesisLogWriter }
if (loggingConfig.request.enabled) {
val requestLogWriter = when (loggingConfig.request.destination) {
LogDestination.File -> fileLogWriter
LogDestination.Database -> RequestLogDBWriter()
LogDestination.AwsKinesis -> awsKinesisLogWriter ?: throw InternalServerException(
InfraResponseCode.SERVER_CONFIG_ERROR, "AwsKinesisLogWriter is not configured"
)
}
logMessageDispatcher.register(RequestLog.LOG_TYPE, requestLogWriter)
}
if (loggingConfig.error.enabled) {
val errorLogWriter = when (loggingConfig.error.destination) {
LogDestination.File -> fileLogWriter
LogDestination.Database -> ErrorLogDBWriter()
LogDestination.AwsKinesis -> awsKinesisLogWriter ?: throw InternalServerException(
InfraResponseCode.SERVER_CONFIG_ERROR, "kinesisLogWriter is not configured"
)
}
logMessageDispatcher.register(ErrorLog.LOG_TYPE, errorLogWriter)
}
val logWriter = loggingConfig.asyncExecutor?.let {
LogMessageCoroutineActor(it.coroutineActor, logMessageDispatcher)
} ?: logMessageDispatcher
single { logWriter }
KoinApplicationShutdownManager.register { logWriter.shutdown() }
}
)
}
先安装 Logging Plugin 初始化 LogMessageDispatcher 之後,SessionAuth Plugin 就可以透过 Koin DI 取出 LogMessageDispatcher
物件,再注册 LoginLog 使用 AwsKinesisLogWriter
。当使用者登入、登出时,就可以使用 AwsKinesisLogWriter
写入 LoginLog。
sessionAuth {
storageType = "Redis" # Redis
redisKeyExpiredNotification = true
session {
expireDuration = 1d
extendDuration = 15m
}
logging {
enabled = true
destination = "AwsKinesis" # File(default), Database, AwsKinesis
}
}
val loginLogWriter = when (authConfig.logging.destination) {
LogDestination.File -> pipeline.get<FileLogWriter>()
LogDestination.Database -> LoginLogDBWriter()
LogDestination.AwsKinesis -> pipeline.get<AwsKinesisLogWriter>()
}
val logMessageDispatcher = pipeline.get<LogMessageDispatcher>()
logMessageDispatcher.register(LoginLog.LOG_TYPE, loginLogWriter)
系统错误可分为2种,一种是处理外部请求的错误,我们可以在 Global Exception Handler ,先透过 Koin DI 取得 LogWriter,再把 Exception 转为 ErrorLog 物件写入。
install(StatusPages) {
val loggingConfig = get<LoggingConfig>()
val logWriter = get<LogWriter>()
val responseCreator = get<I18nResponseCreator>()
exception<Throwable> { cause ->
val e = ExceptionUtils.wrapException(cause)
ExceptionUtils.writeLogToFile(e, call)
if (loggingConfig.error.enabled) {
logWriter.write(ErrorLog.request(e, call))
}
val errorResponse = responseCreator.createErrorResponse(e, call)
call.respond(errorResponse)
}
}
另一种情况是系统内部背景排程、执行非同步工作的错误。例如 DBAsyncTaskCoroutineActor 非同步写入资料库时,我们要 catch Exception 转为 ErrorLog 物件再写入,差别在於这里没有外部请求的 ApplicationCall 物件
private fun execute(task: DBAsyncTask) {
try {
transaction {
task.block(this)
}
} catch (e: Throwable) {
val errorMsg = "$actorName execute error"
logger.error("errorMsg => $task", e)
logWriter.write(
ErrorLog.internal(
InternalServerException(
InfraResponseCode.DB_ASYNC_TASK_ERROR, errorMsg, e,
mapOf("taskId" to task.id, "taskType" to task.type)
),
actorName, task.id.toString()
)
)
}
}
今天我说明了如何建立系统的 Logging 机制处理不同类型的 log,另一种类似的需求是系统也需要对各种非同步工作采用一致的方式处理。例如今天提到的 LogMessageCoroutineActor
非同步写入 log,还有 DBAsyncTaskCoroutineActor
非同步写入资料库。明天我会说明如何把 Coroutine SendChannel 包装成不同功能的 CoroutineActor 执行非同步工作。
>>: 爬虫怎麽爬 从零开始的爬虫自学 DAY11 python列表基础篇
大家好,我是毛毛。ヾ(´∀ ˋ)ノ 废话不多说开始今天的解题Day~ 136. Single Num...
虾米系Label 当大家都完成第一步往k8s部署,第二步就是要来好好管理这些container,k8...
强型闯入DenoLand[28] - Oak 概念篇 什麽是 Oak? Oak 是一款用来开发 h...
昨天讲的是 API 传递资料的流程,今天就来介绍怎麽把资料包装成物件,方便传递吧。 一样,会使用 J...
前言 今天要在 RecipeDetailView 中添加 Picker controller, 使其...