26 - 建立结构化的 Log (4/4) - Elasticsearch Ingest Pipeline 资料 Index 前的转换好帮手 - Enrich 资料与例外处理

建立结构化的 Log 系列文章


本篇学习重点

  • 使用 Ingest Pipeline 时,想要透过查找其他资讯,将相关的资讯加入到处理的文件中,这个 Enrich 功能是如何运作及要怎麽使用。
  • 使用 Ingest Pipeline 时,若发生错误,要如何进行例外状况的处理。

如何在 Ingest Pipeline 时 Enrich 资料

什麽是 Enrich

Enrich (充实、使丰富),指的是在 Ingest Pipeline 中,透过其他地方取得相关的资料,并加在原来的资料当中,让资料更为丰富。

这种做法在资料处理 ETL (Extract, Transform, Load) 的过程中蛮常使用,也很重要的一种做法,能让我们能做到『空间换时间』或是『先苦後甘』这样的目的。

由於 Elasticsearch 不是关联式资料库,而是 Document Based (文件型) 的 NoSQL 资料库,所以文件在存入 Elasticsearch 之前应该要视情况去正规化,同时为了追求查询时能有较快的执行速度,会在文件存入时,尽可能将文件查询时会使用到的资讯先一并写入在文件之中,避免後续执行时要另外透过 Elasticsearch 的 Join 或是 Application 端另外处理资料查询及合并等动作。

例如以下几种情境:

  • 在存放销售订单的 Document 中,依照订单里的 Product ID 将 Product 的详细资料查询出来,加在销售订单的文件之中。
  • 透过已定义好的 IP 位置清单,识别出某一笔处理的请求是来自於客户或是某个供应商。
  • 根据地理坐标,查出地址或是邮递区号,加在原来的文件之中。
  • Web Server 的存取 logs,透过 IP反查出 Geo Location 的坐标资讯并且记录在 log 之中,让日後使用时能直接透过地图呈现地区的分布。
  • 在使用者点击观看影片记录的 log 之中,先将日後分析时会使用到的影片类型、使用者资讯,先反查出来添加在 log 之中。

在 Elasticsearch Ingest Pipeline 的处理过程中,有定义一个 Enrich Processor ,就是专门提供资料 Enrich 的处理,接着将介绍这个 Enrich Processor 的运作方式。

Enrich Processor 的运作方式

先摘录 Enrich Processor 的运作重点:

  • Lookup (查找) 的来源 (Source Index) 只能是 Elasticsearch Index,不支援从 Elasticsearch 的外部读资料。
  • 会依照 Policy 的查找规则,将符合规则的资料转存在另一个 Enrich Index 中。
  • Enrich Processor 在运作时,只会比对 Enrich Index 里的资料,有找到就会加入到 Document 里。
  • Source Index 的资料更新时,不会反应到 Enrich Index 里,会需要另外重新执行 Policy,才能重新产生新的 Enrich Index 资料。

接下来我们针对运作的架构与流程进行较细部的说明。

26-enrich-process

上图的运作架构,在 Ingest Pipeline 的处理过程中,加上了 enrich processor ,这个 enrich 的背後,共有三个不同的角色:

Enrich Policy

首先 Enrich Policy 是一组需要另外建立的设定,其中定义了 Enrich 的操作应该如何进行,包含

  • 定义存放 Enrich 资料的 Source Index。
  • policy_type 定义找资料时要用哪一种比对方式。
  • 指定 match 栏位,表示要从 Source Index 中的哪个栏位来进行查寻。
  • enrich_fields ,要将从 Source Index 中查寻到文件里的哪些栏位,加入到原来的文件中。

Enrich Policy 是要经过 Execute (执行) 的 API 来触发运作,并不是自动会在背景执行的机制,在执行时,会将 Source Index 里符合条件的资料找出,并写入到 Enrich Index 当中进行独立的储存。

注意,Enrich Policy 建立後不能修改,只能删除并建立新的 Enrich Policy。

Source Index

Enrich 的处理过程中,会透过某个资料的来源进行查询以取得额外的资料,这个资料来源必须是 Elasticsearch 中的 Index,也就所谓的 Source Index。

Source Index 可以是一个或多个 Elasticsearch 的 Index,而这个 Index 其实就是一般 Elasticsearch 的 Index,并没有不同,所以能用一般存取的方式进行资料的维护,并且一个 Elasticsearch 的 Index 可以同时当作多个不同 Enrich 处理的 Source Index。

Enrich Index

由於每次 Enrich Processor 在处理 Indexing 的文件时,若当下直接从 Source Index 查找资料时,因为较花资源,另外也可能因为查询条件较复杂会执行较久,所以 Enrich 的运作机制中,有定义了 Enrich Index,让 Enrich Policy 执行时,透过 Elasticsearch 所建立一个系统层级的 Index,并且会与 Enrich Policy 绑定,里面存放着在 Source Index 里找到的文件,也是 Enrich Processor 在处理 Indexing 文件时,实际会用来查找资料的资料来源。

Enrich Index 有以下几个特性:

  • 是由 Elasticsearch 所建立及维护的 Index,因此不应该直接去使用这些系统 Index。
  • Enrich Index 的名称会是 .enrich-* 开头。
  • Enrich Index 被建立之後,会执行 Segment files 的 force merged 的,以增加查询时的效率。
  • Enrich Index 是唯读的,也就是无法修改里面的内容。

使用 Enrich Processor 的完整步骤

在了解 Enrich Processor 的运作方式之後,这边来介绍要使用时的完整步骤:

  1. 准备 Source Index:在 Indexing Document 时,提供 Ingest Pipeline 的 Enrich 查阅的资料,将这些资料存放在 Elasticsearch 的 Index 之中。
  2. 建立 Enrich Policy:透过 create enrich policy API 来建立 Enrich Policy。
  3. 执行 Enrich Policy:使用 execute enrich policy API 针对上面建立好的 Enrich Policy 来触发执行,并建立出 Enrich Index。
  4. 在 Ingest Pipeline 中指定 enrich processor:可以将 enrich processor 添加到现有的 Ingest Pipeline 之中,或是建立新的 Ingest Pipeline。
  5. 将文件 Indexing 到 Elasticsearch 之中,并指定使用上面建立好的 Ingest Pipeline。
  6. 如果查阅的资料有异动,先更新到 Source Index 之中,再执行步骤 3 的 execute enrich policy,将 Enrich Index 的资料进行更新,如果先前已经 Ingest 的资料也想要回溯,可以另外透过 _reindex 或 update_by_query 并指定 Ingest Pipeline,以使用新的 Enrich Index 来更新资料。
  7. 如果 Enrich Policy 要修改,先建立新的 Enrich Policy,并且修改 enrich processor 使用新的 Enrich Policy,再删除旧的 Enrich Policy。

一个实际使用 Enrich Processor 的例子

依照上述的步骤,我们首先准备 Source Index users

PUT /users/_doc/1?refresh=wait_for
{
  "email": "[email protected]",
  "first_name": "Mardy",
  "last_name": "Brown",
  "city": "New Orleans",
  "county": "Orleans",
  "state": "LA",
  "zip": 70116,
  "web": "mardy.asciidocsmith.com"
}

接着我们定义 Enrich Policy - users-policy,并且指定使用 email 栏位来进行查阅,若有查到,我们要将 first_namelast_namecityzipstate 的资料增加到 indexing 的文件中。

PUT /_enrich/policy/users-policy
{
  "match": {
    "indices": "users",
    "match_field": "email",
    "enrich_fields": ["first_name", "last_name", "city", "zip", "state"]
  }
}

执行 Enrich Policy,以建立 Enrich Index。

POST /_enrich/policy/users-policy/_execute

这时可以先使用 _cat/indices 查看 Enrich Index 是否有正确建立:

GET _cat/indices/.enrich-users-policy*?v

并使用 _search 查看 Enrich Index 里的内容:

GET .enrich-users-policy-*/_search

接着我们建立 Ingest Pipeline 并且使用 enrich processor

PUT /_ingest/pipeline/user_lookup
{
  "processors" : [
    {
      "enrich" : {
        "description": "Add 'user' data based on 'email'",
        "policy_name": "users-policy",
        "field" : "email",
        "target_field": "user",
        "max_matches": "1"
      }
    }
  ]
}

我们可以 Indexing 文件,并指定 Ingest Pipeline 来确认是否正常运作

PUT /my-index-000001/_doc/my_id?pipeline=user_lookup
{
  "email": "[email protected]"
}

最後确认 Indexing 进入 Elasticsearch 的文件有正确的如我们的预期被 Enrich。

GET /my-index-000001/_doc/my_id

参考官方 Geo Location 的范例

除了上述的 term 查阅的 Enrich 方式,Enrich Processor 也有提供 geo_shape 查阅方式,可以参考 官方文件 - Enrich you data based on geolocation

使用 Ingest Pipeline 时的例外处理

使用 Ingest Pipeline 时,如果发生错误,预设的处理行为会丢出 Exception (例外状况) 的错误,并且停止这笔资料的 Indexing 处理。

如果我们希望在某一个特定 Ingest Processor 的处理发生错误时,能忽略这个错误,继续的向下执行,我们可以有三种作法:

  1. 在 processor 的设定中,指定 ignore_failure 的属性,并设定成 true ,让错误发生时,直接略过当前的 processor,进入下一个 processor 的处理。
  2. 在 processor 的设定中,指定 on_failure 的设定,让错误发生时,执行另外一系列的 processors。(里面的 processor 也可以再指定错误发生时的 on_failure,型成巢状的设定)
PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "description": "Rename 'provider' to 'cloud.provider'",
        "field": "provider",
        "target_field": "cloud.provider",
        "on_failure": [
          {
            "set": {
              "description": "Set 'error.message'",
              "field": "error.message",
              "value": "Field 'provider' does not exist. Cannot rename to 'cloud.provider'",
              "override": false
            }
          }
        ]
      }
    }
  ]
}
  1. 直接在最外层的 pipeline 设定 on_failure,将整个 pipeline 最终会发生的错误,给抓住。(以下的例子配合 set processor,将这笔发生错误的资料,另外写到指定的 index 中。)
PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Index document to 'failed-<index>'",
        "field": "_index",
        "value": "failed-{{{ _index }}}"
      }
    }
  ]
}

在使用 on_failure 时,也可以使用以下的属性,取得错误相关的资讯:

  • on_failure_message
  • on_failure_processor_type
  • on_failure_processor_tag
  • on_failure_pipeline

使用方式如下:

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Record error information",
        "field": "error_information",
        "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
      }
    }
  ]
}

参考资料

  1. 官方文件 - Elasticsearch Ingest Enrich Data
  2. 官方文件 - Create Enrich Policy API
  3. 官方文件 - Enrich you data based on geolocation
  4. 官方文件 - Elasticsearch Ingest - Handling Pipeline Failures

查看最新 Elasticsearch 或是 Elastic Stack 教育训练资讯: https://training.onedoggo.com
欢迎追踪我的 FB 粉丝页: 乔叔 - Elastic Stack 技术交流
不论是技术分享的文章、公开线上分享、或是实体课程资讯,都会在粉丝页通知大家哦!


<<:  初学者跪着学JavaScript Day26 : 认识生成器,chris不生气

>>:  Day26-介接 API(番外篇 I)NLP 自然语言处理之初见 Dialogflow ES

Day28:八皇后问题- 8 Queens Puzzle

八皇后问题可以说是一道相当经典的演算法题目,以西洋棋为背景,如何在一个8x8的棋盘上摆放八个皇后的...

Day 20 架设开源的 CodiMD 服务

网路上的即时文件协作中,除了 Google Documents 系列外,HackMD 也走出了一条自...

DAY17:清单元件之实作

今天要给大家看实例,接下来用图片介绍。 首先我们先在主画面建立三个清单元件 以及新增两个客制化的画面...

【把玩Azure DevOps】Day16 Artifacts应用:让外部合作夥伴也可以从Private nuget安装Package

前面几篇文章都在提Azure DevOps Artifacts,也就是如何利用这个服务来达成私有化的...

[ Day 16 ] React Hooks 中的 useEffect

昨天介绍了在 Function Component 中该如何操作 state 的方法(附上传送门)...