24 - 建立结构化的 Log (2/4) - Elasticsearch Ingest Pipeline 资料 Index 前的转换好帮手 - 基本介绍

建立结构化的 Log 系列文章


本篇学习重点

  • Elasticserach 内建的 Ingest Pipeline 基本介绍
  • 如何使用 Ingest Pipeline
  • 使用 Ingest Pipeline 时的注意事项

Ingest Pipeline 的功用

Ingest Pipeline (撷取管道) 是一个内建在 Elasticsearch 中,文件在进入 Index 前的资料转换 (Transformation) 的工具,主要的任务就是针对透过 Indexing RESTful API 传入的文件,在真正进入 Elasticsearch Indexing 处理之前,先进行前处理,这个前处理可以像是以下几种例子:

  • 将原始的资料丰富化 (enrich),透过查找 Elasticsearch 里存放在别的 Index 里的相关资料,加入到文件之中,来丰富原有的文件。
  • 将日期格式正确的从文件中的某个栏位撷取出来,让 Elasticsearch 的 @timestamp 有正确的时间。
  • 将 IP 的栏位,透过反查 GeoIP 的资料库,加入 GeoLocation 的资讯在文件中,以利於之後能使用地图检视资料。
  • 将文字内容,依照特定的格式,拆解成结构化的 JSON 栏位与值。
  • 将 URL 拆解成包含 pathschemeportdomainquery…各个栏位。
  • 将 URL 中 Query String 里的 Key / Value,转成结构化的 JSON 栏位与值。
  • 当某个条件成立时,填加某个固定值。

Ingest Pipeline 的运作及使用方式

24-ingest-pipeline-flow

上图是 Ingest Pipeline 运作时的主要流程,在这过程中,我们分别解释每个阶段的运作及操作方式:

  • Incoming Documents (传入的文件): 在使用 Indexing API、或是 _bulk API 将文件传入至 Elasticsearch 准备进行 Indexing 时,可以指定要使用哪一组预先定义好的 Ingest Pipeline。
  • Ingest Pipeline (撷取管道): Elasticsearch 在到 Indexing 的请求时,如果有指定 Ingest Pipeline,Coordinator (协调者) Node 会把这个请求,交给 ingest node,透过定义好的 Pipeline 设定,经由当中指定的各种 Processor (处理器) 一步一步的将资料进行处理。
  • Target Index (目标索引): Ingest Pipeline 在处理完之後,会将最终的文件,透过 Coordinator 传送到 Primary Shard 所在的 Node,进行 Indexing 後续的处理。

接下来将会依照实际准备时的步骤,进行说明。

定义 Ingest Pipeline

使用 Ingest APIs

在文件进入 Elasticsearch 之前,我们必须先准备好 Ingest Pipeline 的定义,这边主要是透过 _ingest 的 API 进行设定:

PUT /_ingest/pipeline/<pipeline>
  • <pipeline>:是自己取的 pipeline 名称

提供一个实际的范例如下:

PUT /_ingest/pipeline/my-pipeline-id
{
  "version": 1,
  "description" : "My optional pipeline description",
  "processors" : [
    {
      "set" : {
        "description" : "My optional processor description",
        "field": "my-keyword-field",
        "value": "foo"
      }
    }
  ],
  "_meta": {
    "reason": "set my-keyword-field to foo",
    "serialization": {
      "class": "MyPipeline",
      "id": 10
    }
  }
}
  • Pipeline 的名字是 my-pipeline-id
  • version 是提供使用者自己记录及参考,与 ingest pipeline 本身的运作功能无关,是选用的栏位。
  • processors 里面指定 1 至多个 processors,这部份会是资料 Transform (转换) 的主要处理,每个 processors 会依照先後顺序来执行,一个 processor 做完後会将输出交给下一个 processor 进行处理,也就是这个功能取名 pipeline 的原因,Elasticsearch 中内建 30 多个 processors,详细可参考 官方文件 - Ingest Processor Reference
  • _meta 是提供给使用者自己存放自己想要额外加入的资讯所使用的,如果 pipeline 是由程序或其他机制在管理时,可以额外记录一些参考的资讯。

除了建立 pipeline 的这个 API,Ingest API 总共有提供:

  • Create or update pipeline
  • Get pipeline
  • Delete pipeline

可以参考 官方文件 - Ingest APIs 的使用说明。

使用 Kibana Ingest Node Pipeline

另外 Kibana 也有提供 UI 的设定画面,可以在 Kibana > Stake Management > Ingest > Ingest Node Pipelines 建立或管理 Ingest Pipeline:

24-kibana-ingest-pipeline

在建立 Create Pipeline 时,就能够使透过 Add a processor 进入以下的画面选择要使用的 processor 并进行相关的设定。

24-kibana-ingest-pipeline-create

上线前使用 Simulate 模拟一下

当我们依照需求定义好 Ingest Pipeline 之後,我们可以透过 _simulate API,并提供测试的文件,确认一下 Ingest Pipeline 的执行结果。

_simulate API 有提供两种模式,第一种是针对还没有建立 Pipeline 时,一并透过 API 模拟指定的 Pipeline 定义 + 测试文件,另一种是已经建立好 Pipelien 的定义,提供测试文件来试用。

Simulate 已建立好的 Pipeline

我们针对 my-pipeline-id 这个 pipeline 来测试,并提供两份文件:

POST /_ingest/pipeline/my-pipeline-id/_simulate
{
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}

可以得到回传结果:

{
   "docs": [
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
               "field2": "_value",
               "foo": "bar"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.187Z"
            }
         }
      },
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
               "field2": "_value",
               "foo": "rab"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.188Z"
            }
         }
      }
   ]
}

Simulate 指定的 Pipeline 规则 + 测试的文件

我们不用指定 Pipeline 名称,直接使用 _ingest/pipeline/_simulate 的 API,并且在 Request Body 中带入 pipeline 的定义:

POST /_ingest/pipeline/_simulate
{
  "pipeline" :
  {
    "description": "_description",
    "processors": [
      {
        "set" : {
          "field" : "field2",
          "value" : "_value"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "foo": "rab"
      }
    }
  ]
}

以下是 Simulate 的回传结果:

{
   "docs": [
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
               "field2": "_value",
               "foo": "bar"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.187Z"
            }
         }
      },
      {
         "doc": {
            "_id": "id",
            "_index": "index",
            "_type": "_doc",
            "_source": {
               "field2": "_value",
               "foo": "rab"
            },
            "_ingest": {
               "timestamp": "2017-05-04T22:30:03.188Z"
            }
         }
      }
   ]
}

使用 Kibana 的 Test Pipeline

在 Kibana 之中,也有对应的 Test Pipeline 功能,可以直接填入测试的文件,来检验 Pipeline 的运作结果。

24-kibana-ingest-pipeline-test

Indexing 资料时,使用 Ingest Pipeline 的方法

当上述的方式建立好 Ingest Pipeline 之後,我们将 Document Indexing 进入 Elasticsearch 时,就可以指定要使用 Ingest Pipeline,以下是常用的几种方式:

Index API

使用 Index API 时,指定 pipeline 的名字

POST my-data-stream/_doc?pipeline=my-pipeline
{
  "@timestamp": "2099-03-07T11:04:05.000Z",
  "my-keyword-field": "foo"
}

Bulk API

使用 Bulk API 时,同样也可以指定 pipeline 的名字

PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }

Update By Query

使用 Update by Query 时,也可以指定 pipeline 的名字

POST my-data-stream/_update_by_query?pipeline=my-pipeline

Reindex

Reindex 时也可以指定 pipeline 的名字

POST _reindex
{
  "source": {
    "index": "my-data-stream"
  },
  "dest": {
    "index": "my-new-data-stream",
    "op_type": "create",
    "pipeline": "my-pipeline"
  }
}

Index Setting 或透过 Index Tempate

在 Index Setting 中,可以透过以下的设定,来决定资料写入这个 Index 时,要透过 ingest pipeline 来处理:

  • index.default_pipeline:如果 request 没有带入指定的 pipeline,就会依照这个设定来执行。
  • index.final_pipeline:这是不论有没有其他经由 request 带入的 pipeline 或是 default_pipeline 的设定,在最後都一定会执行的的 pipeline 设定。(也就是如果有其他指定的 pipeline,这两种 pipeline 都会被执行)

因此也可以透过 Index Template 指定 Index setting 中的这两个设定值。

其他 Elastic Stack

其他 Beats、Logstash,也都会有对应的设定,能够在资料透过 Index API 或是 Bulk API 写入时,指定要使用的 Ingest Pipeline,详细请参考这些产品的官方文件说明。

使用 Ingest Pipeline 的注意事项

  • Elasticsearch Cluster 中,至少要有一个有启动 ingest 角色的 Node,如果 Ingest 的工作量很繁重的话,建议安排专门处理 Ingest 的 Node 来进行 Ingest 任务的处理,又或是使用 Logstash 等其他 ETL 工具,避免占用资源而影响 Elasticsearch 其他功能的运作。
  • 如果有启用 Security 的功能,会需要拥有 manage_pipeline 的权限,如果要从 Kibana 的 Ingest Node Pipeline 画面来操作 Ingest Pipeline 的功能的话,另外还会需要 cluster:monitor/nodes/info 的权限。
  • 在使用 Pipeline 时,版本管理也会是很重要的一件事,为了能更有效率的避免 Pipeline 的定义是旧版,而造成资料 Indexing 时是以非预期的方式处理,善用 version 的版本号码,并且将 Pipeline 的定义进行版控管理,在布署或除错时,也能多透过确认 Pipeline 的 version 来确保版本的正确性。

以上介绍了 Elasticsearch Ingest Pipeline 的基本说明,接下来会以实际的例子进行介绍,说明 Ingest Pipeline 如何协助我们将 Log 结构化。

参考资料

  1. 官方文件 - Ingest Pipelines
  2. 官方文件 - Ingest Processor Reference
  3. 官方文件 - Ingest APIs

查看最新 Elasticsearch 或是 Elastic Stack 教育训练资讯: https://training.onedoggo.com
欢迎追踪我的 FB 粉丝页: 乔叔 - Elastic Stack 技术交流
不论是技术分享的文章、公开线上分享、或是实体课程资讯,都会在粉丝页通知大家哦!


<<:  那些被忽略但很好用的 Web API / ScrollIntoView

>>:  DAY24:Broadcast receiver之简介

VMware 2V0-21.20 VCE - VMware Certified Professional (VCP) Practice Test

Getting Help With a Professional VMware VSphere 7....

Day27-OTO

嗨,各位 说到开源就想私心提一下自己小发烧很爱的flac 首先,何谓flac? flac是"...

< 关於 React: 开始打地基| useState()>

09-08-2021 本章内容 THE STATE HOOK Hook 可以做的事情 规则 使用us...

Day16:【技术篇】SQL之基本CRUD处理能力

一、前言   对於全端工程师的工作内容与心法,我在前面15天的系列文章中已简单列出并加以介绍,而这篇...