铁人赛已逐渐进入尾声,前面二十多天,我们一步步扩充加强 Ktor 功能,也整合了 ORM, Redis 许多框架函式库,整个系统架构及基础设施功能已逐渐成形。今天我想尽可能把先前累积实作的功能都串连起来,以此为基础开发 Multi-Channel Notifications 功能来当作范例。
今天会应用先前已实作的功能及概念,读者可参考以前的文章
NotificationType
实作如何产生讯息内容及指定收件者即可,其余底层寄送讯息的程序码都是共用的Notification Plugin 会根据以下的设定,初始化 email, push, sms 3个 NotificationChannelSender
,每个 NotificationChannelSender
都有各自的细部设定可以调整。本机开发测试时,可以设定 mock = true 使用 MockNotificationChannelSender 只做写入 log 的动作就好。
notification {
channels {
email {
mock = false
noReplyAddress = "[email protected]"
#marketingAddress = ""
awsSES {
nettyHttpClient {
http {
#maxConcurrency = 50 # => aws default value = 50
#maxPendingConnectionAcquires = 10000 => aws default value = 10000
#maxIdleConnectionTimeout = 60s => aws default value = 60s
}
threadPool {
fixedPoolSize = 3
}
}
}
}
push {
mock = false
fcm {
# https://github.com/firebase/firebase-admin-java/issues/317
# https://github.com/googleapis/google-auth-library-java/issues/356
# By default failing requests are retried up to 10 times with exponential backoff.
# connectTimeout = 60000
readTimeout = 180000
threadPool {
fixedPoolSize = 3
}
}
}
sms {
mock = true
}
}
// 其余省略
}
目前 NotificationChannelSender 包含以下实作子类别
maxReceiversPerRequest
=> 可以控制每次发送到第三方服务的讯息数量上限
shutdown()
=> 当 Server 停止时会呼叫此方法关闭与第三方服务的连线物件或 thead pool,例如呼叫 AWS SesAsyncClient 的 close()
方法
interface NotificationChannelSender {
fun send(message: NotificationMessage)
val maxReceiversPerRequest: Int
fun shutdown() {}
}
class AwsSESSender(
config: AwsSESConfig,
private val loggingConfig: NotificationLogConfig,
private val logWriter: LogWriter
) : NotificationChannelSender
class FCMSender(
config: FCMConfig,
private val pushTokenStorage: PushTokenStorage,
private val loggingConfig: NotificationLogConfig,
private val logWriter: LogWriter
) : NotificationChannelSender
class MitakeSender(
private val config: MitakeConfig,
private val loggingConfig: NotificationLogConfig,
private val logWriter: LogWriter
) : NotificationChannelSender
class MockNotificationChannelSender(
private val loggingConfig: NotificationLogConfig,
private val logWriter: LogWriter
) : NotificationChannelSender
NotificationChannelSender 寄送讯息时会呼叫 send(message: NotificationMessage)
方法,NotificationMessage
除了讯息内容、收件者之外,还包含了许多额外资讯,方便未来查询。
所以这3个 id 的阶层关系由上到下的顺序是 eventId → notificationId → notificationMessageId,我们可以查询某个事件寄送的所有 Notification 及 NotificationMessage
data class NotificationMessage(
val notificationId: UUID,
val eventId: UUID,
val type: NotificationType,
val version: String? = null,
val channel: NotificationChannel,
val lang: Lang,
val sender: String? = null,
val receivers: List<String>,
val content: NotificationChannelContent,
var sendAt: Instant? = null
) : IdentifiableObject<UUID>() {
override val id: UUID = UUID.randomUUID()
fun toNotificationMessageLog(): NotificationMessageLog = NotificationMessageLog(
id, notificationId, eventId,
type, version, channel, lang, receivers
)
}
NotificationChannelSender 寄送讯息时,也会使用 LogWriter 记录 log,包含第三方服务的回应码及讯息…等,方便未来追踪问题除错。更多实作细节可参考以前的文章 [Day 20] 实作 Logging Plugin 建立系统 Logging 机制
notification {
logging {
enabled = true
destination = "AwsKinesis" # File(default), Database, AwsKinesis
logSuccess = false
logSuccessReqBody = false
logSuccessRspBody = false
}
// 其余省略
}
@Serializable
data class NotificationMessageLog(
@Serializable(with = UUIDSerializer::class) override val id: UUID,
@Serializable(with = UUIDSerializer::class) val notificationId: UUID,
@Serializable(with = UUIDSerializer::class) val eventId: UUID,
val type: NotificationType,
val version: String? = null,
val channel: NotificationChannel,
val lang: Lang,
val receivers: List<String>,
@Serializable(with = InstantSerializer::class) var sendAt: Instant? = null,
// result
var successList: JsonArray? = null,
var failureList: JsonArray? = null,
var invalidRecipientIds: List<String>? = null,
// response detail
var rspCode: String? = null,
var rspMsg: String? = null,
@Serializable(with = InstantSerializer::class) var rspAt: Instant? = null,
var rspTime: Long? = null,
var rspBody: String? = null
) : LogMessage() {
@Serializable(with = InstantSerializer::class)
override val occurAt: Instant = Instant.now()
var content: String? = null
var success: Boolean = true
var errorMsg: String? = null
override val logLevel: LogLevel = if (success) LogLevel.INFO else LogLevel.ERROR
override val logType: String = LOG_TYPE
companion object {
const val LOG_TYPE = "notification_message"
}
}
实作完底层 NotificationChannelSender 串接第三方服务後,接下来是实作上层负责寄送通知的 NotificationSender,我希望寄送通知的程序写法应该要非常简单,当某个功能需要寄送通知时,只要先透过 Koin DI 取得 NotificationSender 物件,再呼叫 send(notification: Notification)
方法即可。
val notificationSender by inject<NotificationSender>()
notificationSender.send(notification)
interface NotificationSender {
fun send(notification: Notification)
fun shutdown() {}
}
当我们使用 NotificationSender 发送通知时,需要先建立 Notification 物件,填入讯息内容 NotificationContent 及收件者 Recipient…等资料。
@Serializable
data class Notification(
val type: NotificationType,
val recipients: MutableSet<Recipient> = mutableSetOf(),
val content: NotificationContent = NotificationContent(),
val contentArgs: MutableMap<String, String> = mutableMapOf(),
@Transient val templateArgs: MutableMap<String, Any> = mutableMapOf(), // templateArgs doesn't support i18n now
@Transient val lazyLoadArg: Any? = null,
val remote: Boolean = false,
val remoteArg: JsonObject? = null,
@Serializable(with = UUIDSerializer::class) override val id: UUID = UUID.randomUUID(),
@Serializable(with = UUIDSerializer::class) val eventId: UUID = UUID.randomUUID(),
@Serializable(with = InstantSerializer::class) val createAt: Instant = Instant.now(),
var version: String? = null
) : IdentifiableObject<UUID>() {
@Serializable(with = InstantSerializer::class)
var sendAt: Instant? = null
@Transient
val lazyLoad = type.isLazy()
fun load() = type.lazyLoad(this)
}
@Serializable
data class Recipient(
override val id: String,
val userType: UserType? = null,
@Serializable(with = UUIDSerializer::class) val userId: UUID? = null,
// val channels: Set<NotificationChannel>? = null, TODO => user notification preferences
val name: String? = null,
var lang: Lang? = null,
val email: String? = null,
val mobile: String? = null,
val pushTokens: Set<String>? = null
) : IdentifiableObject<String>()
@Serializable
data class NotificationContent(
val email: MutableMap<Lang, EmailContent> = mutableMapOf(),
val push: MutableMap<Lang, PushContent> = mutableMapOf(),
val sms: MutableMap<Lang, SMSContent> = mutableMapOf()
)
@Serializable
class EmailContent(
var subject: String? = null,
var body: String? = null,
@Transient val attachments: List<Attachment>? = null
)
@Serializable
class PushContent(
val title: String,
val body: String,
val data: Map<String, String>? = null
)
@Serializable
class SMSContent(
val body: String
)
findRecipients(userFilters: Map<UserType, String>?)
方法@Serializable
open class NotificationType(
val projectId: String,
val name: String,
val channels: Set<NotificationChannel>,
val category: NotificationCategory,
// val priority: NotificationPriority TODO => priority queues
val version: String? = null,
val lang: Lang? = null,
@Transient @OpenApiIgnore private val lazyLoadBlock: (NotificationType.(Notification) -> Unit)? = null
) : IdentifiableObject<String>() {
override val id: String = "${projectId}_${name}"
fun isLazy(): Boolean = lazyLoadBlock != null
fun lazyLoad(notification: Notification) {
requireNotNull(lazyLoadBlock)
lazyLoadBlock.invoke(this, notification)
}
open fun findRecipients(userFilters: Map<UserType, String>?): Set<Recipient> =
error("NotificationType $id findRecipients is not yet implemented")
}
enum class NotificationChannel {
Email, Push, SMS
}
enum class NotificationCategory {
System, Marketing
}
enum class NotificationPriority {
URGENT, HIGH, LOW
}
class NotificationDispatcher(
private val config: NotificationChannelConfig,
private val envMode: EnvMode,
private val availableLangs: AvailableLangs,
private val i18nNotificationProjectMessages: I18nNotificationProjectMessages,
private val emailSender: NotificationChannelSender? = null,
private val pushSender: NotificationChannelSender? = null,
private val smsSender: NotificationChannelSender? = null
) : NotificationSender
NotificationDispatcher 也实作 NotificationSender 介面,当呼叫 send(notification: Notification)
方法时,会根据 Notification 的 NotificationType 所定义的 channels: Set<NotificationChannel>
,呼叫对应 NotificationChannelSender 的 send(message: NotificationMessage)
方法
这里使用 decorator pattern 的手法,让 NotificationDispatcher 及 NotificationCoroutineActor 都实作 NotificationSender 介面,所以呼叫 send(notification: Notification) 方法时,会 delegate 到内部的 NotificationSender,执行顺序会是 NotificationCoroutineActor → NotificationDispatcher → NotificationChannelSender。更多 NotificationCoroutineActor 的实作细节,请参考 [Day 21] 使用 Coroutine SendChannel 处理非同步工作
class NotificationCoroutineActor(
coroutineActorConfig: CoroutineActorConfig,
private val notificationSender: NotificationSender,
private val logWriter: LogWriter
) : NotificationSender {
private val logger = KotlinLogging.logger {}
private val actorName = "NotificationActor"
private val actor: CoroutineActor<Notification> = CoroutineActor(
actorName, Channel.UNLIMITED,
coroutineActorConfig, Dispatchers.IO,
this::execute, null,
logWriter
)
override fun send(notification: Notification) {
actor.sendToUnlimitedChannel(notification, InfraResponseCode.NOTIFICATION_ERROR) // non-blocking by Channel.UNLIMITED
}
private fun execute(notification: Notification) {
try {
notificationSender.send(notification)
} catch (e: Throwable) {
val errorMsg = "$actorName execute error"
logger.error("errorMsg => $notification", e)
logWriter.write(
ErrorLog.internal(
InternalServerException(
InfraResponseCode.NOTIFICATION_ERROR, errorMsg, e,
mapOf("id" to notification.id, "type" to notification.type, "eventId" to notification.eventId)
),
actorName, notification.id.toString()
)
)
}
}
override fun shutdown() {
notificationSender.shutdown()
actor.close()
}
}
最後实作 Notification Plugin 初始化 NotificationDispatcher、NotificationCoroutineActor 及所有 NotificationChannelSender,并且都注册至 Koin DI。当某个功能需要寄送通知时,只要先透过 Koin DI 取得 NotificationSender 物件,再呼叫 send(notification: Notification) 方法即可。
install(NotificationFeature)
override fun install(pipeline: Application, configure: Configuration.() -> Unit): NotificationFeature {
pipeline.koin {
modules(
module(createdAtStart = true) {
val notificationDispatcher = NotificationDispatcher(
channelConfig,
envMode,
availableLangs,
i18nNotificationProjectMessagesProviders,
emailSender,
pushSender,
smsSender
)
val notificationSender = config.asyncExecutor?.let {
NotificationCoroutineActor(it.coroutineActor, notificationDispatcher, logWriter)
} ?: notificationDispatcher
single { notificationSender }
KoinApplicationShutdownManager.register { notificationSender.shutdown() }
// 其余省略
}
)
}
}
为了支援多国语言的讯息内容,我实作了 I18nNotificationProjectMessages 从语系档 notification_${lang}.properties 读取讯息文字,例如下面是 ops 子专案的 dataReport
NotificationType 的 Email 主旨,详细的实作细节可参考之前的文章 [Day 11] 实作 Ktor i18n 机制
# format => ${type}.${channel}.${part}=""
ops_dataReport.Email.subject=[维运] 资料查询报表: ${dataType} ${queryTime}
至於 Email 的 body 内容就需要准备每个语言的 html 样板档案
<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
</head>
<body>
<div id="content">
<ul>
<li>
查询时间: ${queryTime}
</li>
<li>
查询资料类型: ${dataType}
</li>
<li>
查询条件: ${query}
</li>
</ul>
</div>
</body>
</html>
然後实作 NotificationTemplateProcessor 使用 FreeMarker 载入 html 样板档案,并替换里面的变数值
class NotificationTemplateProcessor(
private val availableLangs: AvailableLangs
) {
private val cfg: Configuration = Configuration(Configuration.VERSION_2_3_31).apply {
templateLoader = ClassTemplateLoader(NotificationTemplateProcessor::class.java, "/i18n/notification/templates")
templateExceptionHandler = TemplateExceptionHandler.RETHROW_HANDLER
logTemplateExceptions = false
wrapUncheckedExceptions = true
recognizeStandardFileExtensions = false
defaultEncoding = "UTF-8"
outputFormat = HTMLOutputFormat.INSTANCE
locale = availableLangs.first().locale
timeZone = TimeZone.getTimeZone(ZoneId.of("UTC"))
dateFormat = "yyyy-MM-dd"
dateTimeFormat = "yyyy-MM-dd HH:mm:ss"
}
fun processEmail(type: NotificationType, args: Map<String, Any>, lang: Lang): String =
process(type.id, args, lang, "html")
private fun process(templateName: String, args: Map<String, Any>, lang: Lang, ext: String): String {
val templateFileName = buildTemplateFileName(templateName, lang, ext)
val template: Template = try {
cfg.getTemplate(templateFileName, null, null, null, true, true)
?: cfg.getTemplate(buildTemplateFileName(templateName, availableLangs.first(), ext))
} catch (e: Throwable) {
throw InternalServerException(InfraResponseCode.DEV_ERROR, "template file $templateFileName parsing error or not found", e)
}
return try {
val outputWriter = StringWriter()
template.process(args, outputWriter)
outputWriter.toString()
} catch (e: Throwable) {
throw InternalServerException(InfraResponseCode.DEV_ERROR, "process template error: template = $templateName, args = $args", e)
}
}
private fun buildTemplateFileName(templateName: String, lang: Lang, ext: String) = "${templateName}_${lang.code}.$ext"
}
实作的目标是 => 管理者可以填写讯息文字,并撰写 QueryDSL 只传送讯息给符合查询条件的使用者。以下面的 request body json 为例,系统只会寄送中文测试通知给符合查询条件使用者 (1980年後出生的男性,而且为启用状态的使用者)
{
"userFilters": {
"club_user": "[enabled = true and gender = Male and birthYear >= 1980]"
},
"content": {
"email": {
"zh-TW": {
"subject": "测试Email",
"body": "我是Email内容"
}
},
"push": {
"zh-TW": {
"title": "测试推播",
"body": "我是推播内容"
}
}
}
}
首先建立 POST /club/users/sendNotification
API,只有 ClubAuth.Admin 角色的使用者可以呼叫。接下来是透过 Koin DI 取得 NotificationSender,然後建立 NotificationType 为 SendNotification 的 Notification 物件,最後再呼叫 send() 方法即可发送通知。
由於是非同步发送通知,所以 API 会回传 notification 的 id,管理者可以稍後使用此 id 去 Kibana 搜寻 log 查询寄送结果
fun Routing.clubUser() {
val notificationSender by inject<NotificationSender>()
route("${ClubConst.urlRootPath}/users") {
authorize(ClubAuth.Admin) {
post<SendNotificationForm, UUID>("/sendNotification", ClubOpenApi.SendNotification) { form ->
val notification = form.toNotification(ClubNotification.SendNotification)
notificationSender.send(notification)
call.respond(DataResponseDTO.uuid(notification.id))
}
}
}
}
@Serializable
data class SendNotificationForm(
val recipients: MutableSet<Recipient>? = null, //直接填入 Recipient 资料
val userFilters: Map<UserType, String>? = null, // 或是撰写 QueryDSL 查询使用者
val content: NotificationContent, // 直接填写通知讯息内容
val contentArgs: MutableMap<String, String>? = null
): Form<SendNotificationForm>() {
fun toNotification(type: NotificationType): Notification {
content.email.values.forEach {
it.body = buildEmailHtmlBody(it.body!!)
}
val recipients = recipients ?: type.findRecipients(userFilters)
if (recipients.isEmpty()) {
throw RequestException(InfraResponseCode.QUERY_RESULT_EMPTY, "recipients is empty")
}
return Notification(
type, recipients = recipients.toMutableSet(),
content, contentArgs = contentArgs ?: mutableMapOf(),
remote = false
)
}
// 其余省略
}
我在 ClubNotification.kt 档案定义了 Club 子专案所有的 NotificationType。每个 NotificationType 都是子类别 ClubNotificationType
物件,ClubNotificationType
已实作 findRecipients(userFilters: Map<UserType, String>?)
方法,可以从资料库载入 Club 子专案中符合 QueryDSL 查询条件的使用者资料,包括 email, lang…等栏位,建立 Recipient 物件。
object ClubNotification {
val SendNotification = ClubNotificationType(
"sendNotification",
setOf(NotificationChannel.Push, NotificationChannel.Email), NotificationCategory.System
)
}
class ClubNotificationType(
name: String,
channels: Set<NotificationChannel>,
category: NotificationCategory,
@Transient @OpenApiIgnore private val lazyLoadBlock: (NotificationType.(Notification) -> Unit)? = null
) : NotificationType(ClubConst.projectId, name, channels, category, null, null, lazyLoadBlock) {
override fun findRecipients(userFilters: Map<UserType, String>?): Set<Recipient> {
val userFilter = userFilters?.get(ClubUserType.User.value)?.let {
if (it.isBlank()) null
else DynamicDBQuery.convertPredicate(DynamicQuery.parseFilter(it), UserDTO.mapper)
}
return transaction {
val query = ClubUserTable.join(UserDeviceTable, JoinType.LEFT, ClubUserTable.id, UserDeviceTable.userId) {
UserDeviceTable.enabled eq true
}.slice(
ClubUserTable.id, ClubUserTable.account, ClubUserTable.name,
ClubUserTable.email, ClubUserTable.mobile, ClubUserTable.lang,
UserDeviceTable.id, UserDeviceTable.pushToken
).select { ClubUserTable.enabled eq true }
userFilter?.let { query.adjustWhere { it } }
query.toList().toDTO(UserDTO::class).map { user ->
with(user) {
Recipient(
account!!, ClubUserType.User.value, id, name, lang, email, mobile,
devices?.mapNotNull { it.pushToken }?.toSet()
)
}
}.toSet()
}
}
}
实作的目标是 => 管理者可以撰写 QueryDSL 查询 User 资料表,把资料汇出成 Excel 档案,寄送至指定 email。以下面的 request body json 为例,系统会查询「角色为 AppTeam 而且为启用状态」的使用者资料 account, name 栏位,汇出成 Excel 档案,寄送 email 至 [email protected]
{
"dataType": "OpsUser",
"email": "[email protected]",
"query": "q_fields=account,name&q_filter=[role = AppTeam and enabled = true]&q_orderBy=createdAt"
}
查询 SQL => SELECT ops_user.id, ops_user.account, ops_user."name" FROM ops_user WHERE (ops_user."role" = 'AppTeam') AND (ops_user.enabled = true) ORDER BY ops_user.created_at ASC
首先建立 POST /ops/data/report API,只有 OpsAuth.Admin 角色的使用者可以呼叫。接下来是透过 Koin DI 取得 NotificationSender,然後建立 NotificationType 为 DataReport 的 Notification 物件,最後再呼叫 send() 方法即可发送通知。
由於是非同步发送通知,所以 API 会回传 notification 的 id,管理者可以稍後使用此 id 去 Kibana 搜寻 log 查询寄送结果
fun Routing.opsDataReport() {
val notificationSender by inject<NotificationSender>()
route("${OpsConst.urlRootPath}/data/report") {
authorize(OpsAuth.OpsTeam) {
post<DataReportForm, UUID>(OpsOpenApi.DataReport) { form ->
val notification = Notification(OpsNotification.DataReport, lazyLoadArg = form)
notificationSender.send(notification)
call.respond(DataResponseDTO.uuid(notification.id))
}
}
}
}
@Serializable
class DataReportForm(
val dataType: DataReportDataType,
val query: String,
var email: String? = null
) : Form<DataReportForm>()
enum class DataReportDataType(val entityDTOType: KType) {
OpsUser(typeOf<UserDTO>())
}
我在 OpsNotification.kt 档案定义了 Ops 子专案所有的 NotificationType。每个 NotificationType 都是子类别 OpsNotificationType 物件。DataReport 的 lazyLoadBlock
lambda 会根据 QueryDSL 查询得到 List 物件,再汇出成 Excel 档案。
object OpsNotification {
val DataReport = OpsNotificationType("dataReport") { notification ->
val form = notification.lazyLoadArg as DataReportForm
requireNotNull(form.email)
notification.recipients.add(Recipient(form.email!!, email = form.email))
val dtoClass = form.dataType.entityDTOType.classifier as KClass<EntityDTO<*>>
val dtoList = transaction {
DynamicQuery.from(form.query).toDBQuery(dtoClass).toList(dtoClass)
}
val columnIds = ReportDataUtils.getColumnIds(dtoClass)
val table = Table(form.dataType.name, columnIds)
dtoList.forEach { table.addRow(ReportDataUtils.toMap(it, columnIds)) }
val report = ReportData(id, name, mutableListOf(table))
val queryTime = Instant.now()
val args = mapOf(
"dataType" to form.dataType.name,
"queryTime" to DateTimeUtils.TAIWAN_DATE_TIME_FORMATTER.format(queryTime),
"query" to form.query
)
notification.templateArgs.putAll(args)
val fileName = "${this.name}_${args["dataType"]}_${args["queryTime"]}"
val attachment = report.toExcelAttachment(fileName)
notification.content.email[lang!!] = EmailContent(attachments = listOf(attachment))
}
private val notificationType = typeOf<OpsNotificationType>()
val AllTypes = OpsNotification::class.memberProperties
.filter { it.returnType == notificationType }
.map { it.getter.call(this) as OpsNotificationType }
}
class OpsNotificationType(
name: String,
@Transient @OpenApiIgnore private val lazyLoadBlock: (NotificationType.(Notification) -> Unit)? = null
) : NotificationType(
OpsConst.projectId, name, setOf(NotificationChannel.Email),
NotificationCategory.System, null, Lang.SystemDefault, lazyLoadBlock
)
要填入 dataType
, queryTime
, query
3个 templateArgs 给 FreeMarker 替换 email template 变数
ops_dataReport.Email.subject=[维运] 资料查询报表: ${dataType} ${queryTime}
<!DOCTYPE html>
<html lang="zh-TW">
<head>
<meta charset="UTF-8">
</head>
<body>
<div id="content">
<ul>
<li>
查询时间: ${queryTime}
</li>
<li>
查询资料类型: ${dataType}
</li>
<li>
查询条件: ${query}
</li>
</ul>
</div>
</body>
</html>
>>: Day17【Web】网路攻击:点击劫持 Clickjacking
如果欢迎讯息写死在程序里,临时想换还要把程序打开来改,改完还要测试,不如就直接让它能在群组里设定吧...
在 VyOS 上设定好 BGP 後,我们来 FRRouting 上设定吧! 环境 我们这次内网使用 ...
昨天的小试身手大家写得如何呢?没有写出来也不要气馁哦,我们一起来看解答吧! 写法1: public ...
在讲Test Double以前,得先上搞清楚另外两个 SUT:System Under Test/S...
写在前面 test for placeholder test for placeholder tes...