Enrich (充实、使丰富),指的是在 Ingest Pipeline 中,透过其他地方取得相关的资料,并加在原来的资料当中,让资料更为丰富。
这种做法在资料处理 ETL (Extract, Transform, Load) 的过程中蛮常使用,也很重要的一种做法,能让我们能做到『空间换时间』或是『先苦後甘』这样的目的。
由於 Elasticsearch 不是关联式资料库,而是 Document Based (文件型) 的 NoSQL 资料库,所以文件在存入 Elasticsearch 之前应该要视情况去正规化,同时为了追求查询时能有较快的执行速度,会在文件存入时,尽可能将文件查询时会使用到的资讯先一并写入在文件之中,避免後续执行时要另外透过 Elasticsearch 的 Join 或是 Application 端另外处理资料查询及合并等动作。
例如以下几种情境:
在 Elasticsearch Ingest Pipeline 的处理过程中,有定义一个 Enrich Processor
,就是专门提供资料 Enrich 的处理,接着将介绍这个 Enrich Processor 的运作方式。
先摘录 Enrich Processor 的运作重点:
接下来我们针对运作的架构与流程进行较细部的说明。
上图的运作架构,在 Ingest Pipeline 的处理过程中,加上了 enrich
processor ,这个 enrich
的背後,共有三个不同的角色:
首先 Enrich Policy 是一组需要另外建立的设定,其中定义了 Enrich 的操作应该如何进行,包含
policy_type
定义找资料时要用哪一种比对方式。match
栏位,表示要从 Source Index 中的哪个栏位来进行查寻。enrich_fields
,要将从 Source Index 中查寻到文件里的哪些栏位,加入到原来的文件中。Enrich Policy 是要经过 Execute
(执行) 的 API 来触发运作,并不是自动会在背景执行的机制,在执行时,会将 Source Index 里符合条件的资料找出,并写入到 Enrich Index 当中进行独立的储存。
注意,Enrich Policy 建立後不能修改,只能删除并建立新的 Enrich Policy。
Enrich 的处理过程中,会透过某个资料的来源进行查询以取得额外的资料,这个资料来源必须是 Elasticsearch 中的 Index,也就所谓的 Source Index。
Source Index 可以是一个或多个 Elasticsearch 的 Index,而这个 Index 其实就是一般 Elasticsearch 的 Index,并没有不同,所以能用一般存取的方式进行资料的维护,并且一个 Elasticsearch 的 Index 可以同时当作多个不同 Enrich 处理的 Source Index。
由於每次 Enrich Processor 在处理 Indexing 的文件时,若当下直接从 Source Index 查找资料时,因为较花资源,另外也可能因为查询条件较复杂会执行较久,所以 Enrich 的运作机制中,有定义了 Enrich Index,让 Enrich Policy 执行时,透过 Elasticsearch 所建立一个系统层级的 Index,并且会与 Enrich Policy 绑定,里面存放着在 Source Index 里找到的文件,也是 Enrich Processor 在处理 Indexing 文件时,实际会用来查找资料的资料来源。
Enrich Index 有以下几个特性:
.enrich-*
开头。在了解 Enrich Processor 的运作方式之後,这边来介绍要使用时的完整步骤:
enrich
processor:可以将 enrich
processor 添加到现有的 Ingest Pipeline 之中,或是建立新的 Ingest Pipeline。enrich
processor 使用新的 Enrich Policy,再删除旧的 Enrich Policy。依照上述的步骤,我们首先准备 Source Index users
:
PUT /users/_doc/1?refresh=wait_for
{
"email": "[email protected]",
"first_name": "Mardy",
"last_name": "Brown",
"city": "New Orleans",
"county": "Orleans",
"state": "LA",
"zip": 70116,
"web": "mardy.asciidocsmith.com"
}
接着我们定义 Enrich Policy - users-policy
,并且指定使用 email
栏位来进行查阅,若有查到,我们要将 first_name
、last_name
、city
、zip
、state
的资料增加到 indexing 的文件中。
PUT /_enrich/policy/users-policy
{
"match": {
"indices": "users",
"match_field": "email",
"enrich_fields": ["first_name", "last_name", "city", "zip", "state"]
}
}
执行 Enrich Policy,以建立 Enrich Index。
POST /_enrich/policy/users-policy/_execute
这时可以先使用 _cat/indices
查看 Enrich Index 是否有正确建立:
GET _cat/indices/.enrich-users-policy*?v
并使用 _search
查看 Enrich Index 里的内容:
GET .enrich-users-policy-*/_search
接着我们建立 Ingest Pipeline 并且使用 enrich
processor
PUT /_ingest/pipeline/user_lookup
{
"processors" : [
{
"enrich" : {
"description": "Add 'user' data based on 'email'",
"policy_name": "users-policy",
"field" : "email",
"target_field": "user",
"max_matches": "1"
}
}
]
}
我们可以 Indexing 文件,并指定 Ingest Pipeline 来确认是否正常运作
PUT /my-index-000001/_doc/my_id?pipeline=user_lookup
{
"email": "[email protected]"
}
最後确认 Indexing 进入 Elasticsearch 的文件有正确的如我们的预期被 Enrich。
GET /my-index-000001/_doc/my_id
除了上述的 term
查阅的 Enrich 方式,Enrich Processor 也有提供 geo_shape
查阅方式,可以参考 官方文件 - Enrich you data based on geolocation。
使用 Ingest Pipeline 时,如果发生错误,预设的处理行为会丢出 Exception (例外状况) 的错误,并且停止这笔资料的 Indexing 处理。
如果我们希望在某一个特定 Ingest Processor 的处理发生错误时,能忽略这个错误,继续的向下执行,我们可以有三种作法:
ignore_failure
的属性,并设定成 true
,让错误发生时,直接略过当前的 processor,进入下一个 processor 的处理。on_failure
的设定,让错误发生时,执行另外一系列的 processors。(里面的 processor 也可以再指定错误发生时的 on_failure
,型成巢状的设定)PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"rename": {
"description": "Rename 'provider' to 'cloud.provider'",
"field": "provider",
"target_field": "cloud.provider",
"on_failure": [
{
"set": {
"description": "Set 'error.message'",
"field": "error.message",
"value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
"override": false
}
}
]
}
}
]
}
on_failure
,将整个 pipeline 最终会发生的错误,给抓住。(以下的例子配合 set
processor,将这笔发生错误的资料,另外写到指定的 index 中。)PUT _ingest/pipeline/my-pipeline
{
"processors": [ ... ],
"on_failure": [
{
"set": {
"description": "Index document to 'failed-<index>'",
"field": "_index",
"value": "failed-{{{ _index }}}"
}
}
]
}
在使用 on_failure
时,也可以使用以下的属性,取得错误相关的资讯:
on_failure_message
on_failure_processor_type
on_failure_processor_tag
on_failure_pipeline
使用方式如下:
PUT _ingest/pipeline/my-pipeline
{
"processors": [ ... ],
"on_failure": [
{
"set": {
"description": "Record error information",
"field": "error_information",
"value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
}
}
]
}
查看最新 Elasticsearch 或是 Elastic Stack 教育训练资讯: https://training.onedoggo.com
欢迎追踪我的 FB 粉丝页: 乔叔 - Elastic Stack 技术交流
不论是技术分享的文章、公开线上分享、或是实体课程资讯,都会在粉丝页通知大家哦!
<<: 初学者跪着学JavaScript Day26 : 认识生成器,chris不生气
>>: Day26-介接 API(番外篇 I)NLP 自然语言处理之初见 Dialogflow ES
八皇后问题可以说是一道相当经典的演算法题目,以西洋棋为背景,如何在一个8x8的棋盘上摆放八个皇后的...
网路上的即时文件协作中,除了 Google Documents 系列外,HackMD 也走出了一条自...
今天要给大家看实例,接下来用图片介绍。 首先我们先在主画面建立三个清单元件 以及新增两个客制化的画面...
前面几篇文章都在提Azure DevOps Artifacts,也就是如何利用这个服务来达成私有化的...
昨天介绍了在 Function Component 中该如何操作 state 的方法(附上传送门)...