Ingest Pipeline (撷取管道) 是一个内建在 Elasticsearch 中,文件在进入 Index 前的资料转换 (Transformation) 的工具,主要的任务就是针对透过 Indexing RESTful API 传入的文件,在真正进入 Elasticsearch Indexing 处理之前,先进行前处理,这个前处理可以像是以下几种例子:
@timestamp
有正确的时间。path
、scheme
、port
、domain
、query
…各个栏位。上图是 Ingest Pipeline 运作时的主要流程,在这过程中,我们分别解释每个阶段的运作及操作方式:
_bulk
API 将文件传入至 Elasticsearch 准备进行 Indexing 时,可以指定要使用哪一组预先定义好的 Ingest Pipeline。ingest
node,透过定义好的 Pipeline 设定,经由当中指定的各种 Processor (处理器) 一步一步的将资料进行处理。接下来将会依照实际准备时的步骤,进行说明。
在文件进入 Elasticsearch 之前,我们必须先准备好 Ingest Pipeline 的定义,这边主要是透过 _ingest
的 API 进行设定:
PUT /_ingest/pipeline/<pipeline>
<pipeline>
:是自己取的 pipeline 名称提供一个实际的范例如下:
PUT /_ingest/pipeline/my-pipeline-id
{
"version": 1,
"description" : "My optional pipeline description",
"processors" : [
{
"set" : {
"description" : "My optional processor description",
"field": "my-keyword-field",
"value": "foo"
}
}
],
"_meta": {
"reason": "set my-keyword-field to foo",
"serialization": {
"class": "MyPipeline",
"id": 10
}
}
}
my-pipeline-id
。version
是提供使用者自己记录及参考,与 ingest pipeline 本身的运作功能无关,是选用的栏位。processors
里面指定 1 至多个 processors,这部份会是资料 Transform (转换) 的主要处理,每个 processors 会依照先後顺序来执行,一个 processor 做完後会将输出交给下一个 processor 进行处理,也就是这个功能取名 pipeline 的原因,Elasticsearch 中内建 30 多个 processors,详细可参考 官方文件 - Ingest Processor Reference。_meta
是提供给使用者自己存放自己想要额外加入的资讯所使用的,如果 pipeline 是由程序或其他机制在管理时,可以额外记录一些参考的资讯。除了建立 pipeline 的这个 API,Ingest API 总共有提供:
可以参考 官方文件 - Ingest APIs 的使用说明。
另外 Kibana 也有提供 UI 的设定画面,可以在 Kibana > Stake Management > Ingest > Ingest Node Pipelines 建立或管理 Ingest Pipeline:
在建立 Create Pipeline 时,就能够使透过 Add a processor 进入以下的画面选择要使用的 processor 并进行相关的设定。
当我们依照需求定义好 Ingest Pipeline 之後,我们可以透过 _simulate
API,并提供测试的文件,确认一下 Ingest Pipeline 的执行结果。
_simulate
API 有提供两种模式,第一种是针对还没有建立 Pipeline 时,一并透过 API 模拟指定的 Pipeline 定义 + 测试文件,另一种是已经建立好 Pipelien 的定义,提供测试文件来试用。
我们针对 my-pipeline-id
这个 pipeline 来测试,并提供两份文件:
POST /_ingest/pipeline/my-pipeline-id/_simulate
{
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
可以得到回传结果:
{
"docs": [
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value",
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:30:03.187Z"
}
}
},
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value",
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:30:03.188Z"
}
}
}
]
}
我们不用指定 Pipeline 名称,直接使用 _ingest/pipeline/_simulate
的 API,并且在 Request Body 中带入 pipeline
的定义:
POST /_ingest/pipeline/_simulate
{
"pipeline" :
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value" : "_value"
}
}
]
},
"docs": [
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
以下是 Simulate 的回传结果:
{
"docs": [
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value",
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:30:03.187Z"
}
}
},
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value",
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:30:03.188Z"
}
}
}
]
}
在 Kibana 之中,也有对应的 Test Pipeline 功能,可以直接填入测试的文件,来检验 Pipeline 的运作结果。
当上述的方式建立好 Ingest Pipeline 之後,我们将 Document Indexing 进入 Elasticsearch 时,就可以指定要使用 Ingest Pipeline,以下是常用的几种方式:
使用 Index API 时,指定 pipeline
的名字
POST my-data-stream/_doc?pipeline=my-pipeline
{
"@timestamp": "2099-03-07T11:04:05.000Z",
"my-keyword-field": "foo"
}
使用 Bulk API 时,同样也可以指定 pipeline
的名字
PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:06.000Z", "my-keyword-field": "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-07T11:04:07.000Z", "my-keyword-field": "bar" }
使用 Update by Query 时,也可以指定 pipeline
的名字
POST my-data-stream/_update_by_query?pipeline=my-pipeline
Reindex 时也可以指定 pipeline
的名字
POST _reindex
{
"source": {
"index": "my-data-stream"
},
"dest": {
"index": "my-new-data-stream",
"op_type": "create",
"pipeline": "my-pipeline"
}
}
在 Index Setting 中,可以透过以下的设定,来决定资料写入这个 Index 时,要透过 ingest pipeline 来处理:
index.default_pipeline
:如果 request 没有带入指定的 pipeline
,就会依照这个设定来执行。index.final_pipeline
:这是不论有没有其他经由 request 带入的 pipeline
或是 default_pipeline
的设定,在最後都一定会执行的的 pipeline 设定。(也就是如果有其他指定的 pipeline,这两种 pipeline 都会被执行)因此也可以透过 Index Template 指定 Index setting 中的这两个设定值。
其他 Beats、Logstash,也都会有对应的设定,能够在资料透过 Index API 或是 Bulk API 写入时,指定要使用的 Ingest Pipeline,详细请参考这些产品的官方文件说明。
ingest
角色的 Node,如果 Ingest 的工作量很繁重的话,建议安排专门处理 Ingest 的 Node 来进行 Ingest 任务的处理,又或是使用 Logstash 等其他 ETL 工具,避免占用资源而影响 Elasticsearch 其他功能的运作。manage_pipeline
的权限,如果要从 Kibana 的 Ingest Node Pipeline 画面来操作 Ingest Pipeline 的功能的话,另外还会需要 cluster:monitor/nodes/info
的权限。version
的版本号码,并且将 Pipeline 的定义进行版控管理,在布署或除错时,也能多透过确认 Pipeline 的 version
来确保版本的正确性。以上介绍了 Elasticsearch Ingest Pipeline 的基本说明,接下来会以实际的例子进行介绍,说明 Ingest Pipeline 如何协助我们将 Log 结构化。
查看最新 Elasticsearch 或是 Elastic Stack 教育训练资讯: https://training.onedoggo.com
欢迎追踪我的 FB 粉丝页: 乔叔 - Elastic Stack 技术交流
不论是技术分享的文章、公开线上分享、或是实体课程资讯,都会在粉丝页通知大家哦!
<<: 那些被忽略但很好用的 Web API / ScrollIntoView
>>: DAY24:Broadcast receiver之简介
Getting Help With a Professional VMware VSphere 7....
嗨,各位 说到开源就想私心提一下自己小发烧很爱的flac 首先,何谓flac? flac是"...
09-08-2021 本章内容 THE STATE HOOK Hook 可以做的事情 规则 使用us...
一、前言 对於全端工程师的工作内容与心法,我在前面15天的系列文章中已简单列出并加以介绍,而这篇...