Day8 NiFi - Processor Group

前面已经讲完 Processor 和 Connection 两个重要的 Componenet,我们就可以透过这两个去建立基本的 Data Pipeline。但有时候我们会建立很多 Data Pipeline 在 NiFi,有些流程甚至是重叠的,这样会觉得不太符合效益,所以就要透过今天的主角 - Processor Group 来做处理。

What is the Processor Group?

当 Date Pipeline 复杂且多样化时,有些小流程可能会有重复的建置,此时可以透过 Processor Group(接下来简称 PG) 来做处理,简单来说就是变成 Module,然後整合到 Data Pipeline 来做一个重复使用的功能。

PG 除了有作为 Module 的功能之外,他也有几个额外重点,其实之前都有提到他,这边再稍微列一下给各位:

  • 可以用来做 Team 或 Project 的划分,加强组织管理与权限控管。
  • NiFi Registry 版本控管的最小单位。

如何操作?


从上面的 gif 档可以看到如何建立 PG,而当我们移动到 PG 内部时,可以注意到左下角也会改变成我们目前所属的阶层(很像 Folder 的概念)。

场景应用

这边我来带一个场景应用,一样拿kaggle titanic 来做一个应用,这里就做基本一些简单的前处理就好,这里示范的前处理步骤如下:

1. 将 Sex 栏位转换成数值
2. 依据 Embarked 栏位分成 3 个 connection 出来

大致上的 Pipeline 会长如下图:

其中中间的 PG (titanic_preprocess) 内部会长如下图:

PG 内部

接下来先针对 PG 内部来做讲解:

  1. 需要设定 input, output port
    如果是要将 PG 视为 Module 做使用且成为 Pipeline 其中的一个流程,则必须在设计 PG 时,需要指定 inputoutput 这两个 port,如此一来才有办法将 FlowFiles 从上游流入,且再从下游流出。

所以如同红框所示即可知道如何加入 inputoutput

而在 PG 内可以同时存在着多的 inputoutput,再由外部建立 connection 时来决定要将 FlowFiles 流入到 PG 的哪一个 port。

  1. 加入 SplitRecord Processor

    这个 Processor 是用来将内容做 split,我们可以去指定 split 的行数,其中参数如下:
  • Record Reader
    要用哪一种 format 来读档案内容,因为档案为 csv,所以这边选择 CSVReader。
  • Record Writer
    要以哪一种方式写入成 FlowFiles 格式,这边一样选择 CSVRecordWriter。
  • Records Per Split
    要以 n 笔作为 split 单位,如果设定为 1,则就会1笔1笔的将资料转成 FlowFiles。

其中CSVReaderCSVRecordWriter 是属於 Controller Service 的一种,所以当选完指定类型时,还要点选右手边的箭头,此时会看到如下画面:

接着按下 闪电 的符号,就能正式启用,为何要这样做呢?详细原因在下一节 Controller Service 会再介绍到。

  1. 加入 UpdateRecord Processor

    这个 Processor 就是用来更新某一个栏位底下的 Value,如同我们这次的场景希望将 Sex 这个栏位原本的 female 转成 0, male 转成 1。

一样当中有几个参数要去做设定:

  • Record Reader
    要用哪一种 format 来读档案内容,因为档案为 csv,所以这边选择 CSVReader。
  • Record Writer
    要以哪一种方式写入成 FlowFiles 格式,这边则选择 JsonRecordWriter,因为我们等等要将一些栏位转成 Attribute。

这里会看到我额外加入了一个 /Sex 这个 property,其中他的value 是:

${field.Sex:equals('female'):ifElse(0,1)}

这个语法是 NiFi 自己的语法,叫做 NiFi Expression Language,是可以帮助我们针对 Flowfiles 的 attributes 和 content 作处理的。
详细的写法也会在後续介绍,这里简单的意思是我们想要取得 Sex 的栏位,且若底下有 female 这个 value 时转成 0,否则转成 1,接着再写回 Sex 栏位作替代,所以 key 才会是 /Sex(斜线是这个 Processor 的规定,这样才有办法找到对应的栏位)。

  1. 加入 EvaluateJsonPath Processor

    在前一个 UpdateRecord Processor,已将 FlowFiles 转成 Json 格式,而这边的 EvaluateJsonPath 就是可以帮我们将 content 的 key 转成 attributes。举例来说,假如有一个如下的 content:
[{"PassengerId":889,"Survived":0,"Pclass":3,"Name":"Johnston, Miss. Catherine Helen \"Carrie\"","Sex":"1","Age":null,"SibSp":1,"Parch":2,"Ticket":"W./C. 6607","Fare":23.45,"Cabin":null,"Embarked":"S"}]

那我们就可以透过上图中的 embark 将特定的 key 转成 FlowFiles 的 attributes:

$[0].Embarked 

如此一来经过该 Processor 之後的 FlowFiles,都会多带一个 embark 的 attributes,而 value 就会是 content 对应到的 value,以这个例子就会是 S。

  1. 加入 RouteOnAttributes Processor

    这个 Processor 是可以帮由我们根据 Attribute 的 value 来自定义後续的 Connection,所以可以看到我的设定:
embark_c: ${embark:equals("C")}
embark_q: ${embark:equals("Q")}
embark_s: ${embark:equals("S")}

如此一来,这个 Processor 就会产生 embark_c, embark_qembark_s 这三个 Connection,因此我们再回来看一次 PG 的图下方橘框:

就可看到这三个 Connection 以及对接对应的 Port。

对接 PG

  • 与 PG 的 input 建立 Connection

    根据上图红框,我们可以看到与 PG 建立 Connection 的结果,而通常与上游 Processor 对接对应到的都会是我们在 PG 内设定好的 input port

所以当我们在建立连线时,就可以看到如下图所示的状况,我们可在右手边决定要选择的 input port

  • 与 PG 的 output 建立 Connection

    根据上图的红框,则可看到 PG 与下游的 Processor 建立 Connection 的结果,而这个建立会对应到我们在 PG 内建立好的 output port

一样从下图左方,我们可选择要与下游 Processor 对接的 port 为何,藉此建立出对应的 Connection。

小总结

透过上面的描述,我们可以得知 PG 要如何建立,以及一些细节的设定,还有如何与 Data Pipeline 来做一个整合,像是 inputoutput 这些 port 的设定与对接,虽然看起来好像操作很简单,但这是在 NiFi 中一个非常重要的操作与概念,希望读者们若有使用的话一定要将此学习起来,这会帮助你更轻松地去组织与建构出有 performance、有可读性的 Data Pipeline。

最後,PG 的介绍就先到这边一个段落,明天会来介绍 - Controller Service。

Reference


<<:  [DAY15]k8s必备良药-Lens

>>:  day 15 - 从执行时间开始优化

day11 Kotlin coroutine 花生什麽事?

前面我讲10篇了,告诉你们coroutine是什麽,怎麽用,如何切thread,和她背後发生什麽事 ...

Day 15-infrastructure 也可以 for each 之二: for_each meta-argument

infrastructure 也可以 for each 之二 课程内容与代码会放在 Github 上...

修复:Windows 10中的USB装置无法辨识/识别

您在经常使用USB随身碟、SD记忆卡、外接硬碟时,可能电脑会弹出「USB装置无法识别。连结到这部电脑...

Day 27:碰到困难问题,演算法也救不了?

上一回我们说旅行推销员问题(TSP)是一个NP困难问题,没有快速的演算法可以解决。 那一个问题怎样叫...

Day12 - 搜寻文章作者及合并方法

今天做点简单的事情,先把搜寻作者的部份给加进来。 Layout也跟昨天一样先多加一行: 基本上前置作...