前面已经讲完 Processor 和 Connection 两个重要的 Componenet,我们就可以透过这两个去建立基本的 Data Pipeline。但有时候我们会建立很多 Data Pipeline 在 NiFi,有些流程甚至是重叠的,这样会觉得不太符合效益,所以就要透过今天的主角 - Processor Group 来做处理。
当 Date Pipeline 复杂且多样化时,有些小流程可能会有重复的建置,此时可以透过 Processor Group(接下来简称 PG) 来做处理,简单来说就是变成 Module,然後整合到 Data Pipeline 来做一个重复使用的功能。
PG 除了有作为 Module 的功能之外,他也有几个额外重点,其实之前都有提到他,这边再稍微列一下给各位:
从上面的 gif 档可以看到如何建立 PG,而当我们移动到 PG 内部时,可以注意到左下角也会改变成我们目前所属的阶层(很像 Folder 的概念)。
这边我来带一个场景应用,一样拿kaggle titanic 来做一个应用,这里就做基本一些简单的前处理就好,这里示范的前处理步骤如下:
1. 将 Sex 栏位转换成数值
2. 依据 Embarked 栏位分成 3 个 connection 出来
大致上的 Pipeline 会长如下图:
其中中间的 PG (titanic_preprocess) 内部会长如下图:
接下来先针对 PG 内部来做讲解:
input
和 output
这两个 port,如此一来才有办法将 FlowFiles 从上游流入,且再从下游流出。所以如同红框所示即可知道如何加入 input
和 output
:
而在 PG 内可以同时存在着多的 input
和 output
,再由外部建立 connection 时来决定要将 FlowFiles 流入到 PG 的哪一个 port。
其中CSVReader
和 CSVRecordWriter
是属於 Controller Service
的一种,所以当选完指定类型时,还要点选右手边的箭头,此时会看到如下画面:
接着按下 闪电
的符号,就能正式启用,为何要这样做呢?详细原因在下一节 Controller Service 会再介绍到。
Sex
这个栏位原本的 female 转成 0, male 转成 1。一样当中有几个参数要去做设定:
这里会看到我额外加入了一个 /Sex
这个 property,其中他的value 是:
${field.Sex:equals('female'):ifElse(0,1)}
这个语法是 NiFi 自己的语法,叫做 NiFi Expression Language
,是可以帮助我们针对 Flowfiles 的 attributes 和 content 作处理的。
详细的写法也会在後续介绍,这里简单的意思是我们想要取得 Sex
的栏位,且若底下有 female
这个 value 时转成 0,否则转成 1,接着再写回 Sex
栏位作替代,所以 key 才会是 /Sex
(斜线是这个 Processor 的规定,这样才有办法找到对应的栏位)。
[{"PassengerId":889,"Survived":0,"Pclass":3,"Name":"Johnston, Miss. Catherine Helen \"Carrie\"","Sex":"1","Age":null,"SibSp":1,"Parch":2,"Ticket":"W./C. 6607","Fare":23.45,"Cabin":null,"Embarked":"S"}]
那我们就可以透过上图中的 embark
将特定的 key 转成 FlowFiles 的 attributes:
$[0].Embarked
如此一来经过该 Processor 之後的 FlowFiles,都会多带一个 embark
的 attributes,而 value 就会是 content 对应到的 value,以这个例子就会是 S。
embark_c: ${embark:equals("C")}
embark_q: ${embark:equals("Q")}
embark_s: ${embark:equals("S")}
如此一来,这个 Processor 就会产生 embark_c
, embark_q
和 embark_s
这三个 Connection,因此我们再回来看一次 PG 的图下方橘框:
就可看到这三个 Connection 以及对接对应的 Port。
input port
。所以当我们在建立连线时,就可以看到如下图所示的状况,我们可在右手边决定要选择的 input port
。
output port
。一样从下图左方,我们可选择要与下游 Processor 对接的 port 为何,藉此建立出对应的 Connection。
透过上面的描述,我们可以得知 PG 要如何建立,以及一些细节的设定,还有如何与 Data Pipeline 来做一个整合,像是 input
和 output
这些 port 的设定与对接,虽然看起来好像操作很简单,但这是在 NiFi 中一个非常重要的操作与概念,希望读者们若有使用的话一定要将此学习起来,这会帮助你更轻松地去组织与建构出有 performance、有可读性的 Data Pipeline。
最後,PG 的介绍就先到这边一个段落,明天会来介绍 - Controller Service。
前面我讲10篇了,告诉你们coroutine是什麽,怎麽用,如何切thread,和她背後发生什麽事 ...
infrastructure 也可以 for each 之二 课程内容与代码会放在 Github 上...
您在经常使用USB随身碟、SD记忆卡、外接硬碟时,可能电脑会弹出「USB装置无法识别。连结到这部电脑...
上一回我们说旅行推销员问题(TSP)是一个NP困难问题,没有快速的演算法可以解决。 那一个问题怎样叫...
今天做点简单的事情,先把搜寻作者的部份给加进来。 Layout也跟昨天一样先多加一行: 基本上前置作...