Day27 NiFi 场景应用范例 (二)

今天要带大家做另外一个简单的场境应用,我们继续沿用昨天所处理的 parquet File 来做今天的小实作,大致上今天要实作的内容如下:

读取 local 端的 parquet file,并且依据 Stage 栏位的值,只选择值 Armor、Champion 和 Mega 的资料送到 AWS SQS。

事前准备

这边的事前准备就是记得在 AWS 建立一个要来用的 SQS,并且复制 SQS 的 URL,实际的画面如下:

How to Build?

这次的范例的 data pipeline 大致会长的如下图:

GetFile

这个 Processor 是可以让我们读取 Local 端的某一个 Folder 下的档案,来看一下如何设定:

这边以我的范例来说,我是将前一天实作完的档案暂存到 /tmp/data 这个路径下,所以只要在 Input Directory 这个 Property 设定好,他就会将底下的档案读上来做使用。

SplitRecord

因为前一个 Processor 只是将档案读取变成一个 FlowFiles 而已,尚未将里面的资料取出来,所以我们可以透过该 Processor 做到这件事情:

原先的档案为 Parquet 格式,所以我们以 ParquetReader 的方式来做读取,并且以一笔 Record为单位,接着再以 JsonRecordSetWriter 来转换成 Json 格式来给下游 Processor 做处理,因为後续我们需要透过 Json 格式来做栏位的判断。

因此我们可以看到经过这个 Processor 的 Content,都会转换成 Json 格式,且一笔为单位,内容如下:

EvaluateJsonPath

这个 Processor 是可以让我们去解析将原先 Content 的某一个栏位转换成 Attribute,所以来看一下设定:

  • Destination: 代表要转换的目的地,这边我们先选择城 flowfile-attribute
  • Stage: 是我新增的一个 Property,後面的 $.Stage 代表他会去解析进来的 FlowFiles 中的 Stage 这个栏位,并且带到名为 Stage 的 Attribute。

所以经过该 Processor 的 Flowfiles,我们会发现都会多带一个名为 Stage 的 Attribute:

有了 Stage 这个 Attribute 之後,原则上该 Attribute 的值会跟 Content 内的 Stage 这个栏位值相同,接着就可以做过滤的动作,所以就会用到 RouteOnAttribute 这个 Processor。

RouteOnAttribute

该 Processor 是让我们可依据 FlowFiles 的状况动代增加下游的 Connection 的 Route,我们先看一下原先的 Processor 设定只会有 unmatched 的 Route:

但是我们可以在 Property 增加更多的条件,如下设定:

这边我们加入了 7 个 Property,分别对应的是:

0, Baby
1, In-Training
2, Rookie
3, Champion
4, Ultimate
5, Mega
6, Ultra
7, Armor

一但设定完成之後,我们会发现 Route 会多出这些刚刚设定的 Property Name:

接着我们在连接下游 Processor 的时候,就可以选定符合哪些条件的 Route 可以连接到下游的 Processor。
以接下来的 PutSQS 的范例为例,我们只需要Armor、Champion 和 Mega 的资料送到 AWS SQS 即可其他的都不要,所以既勾选对应的 Route 即可。

接着就可以看到 RouteOnAttribute 和 PutSQS 之间就只会有这三个的 Connection:

然後其他不会用到的我们先传送到 Wait Processor

PutSQS

一切准备就绪之後,接下来就可以设定 PutSQS 这个 Processor,还记得一开始要你们先事前建立好 SQS,这边就会用到了:

  • Queue URL: 这里就填上你刚刚建立好 SQS URL
  • AWS Credentials Provider service: 对接好 AWS Controller Service
  • Region: 设定好 AWS Region

上述的设定完成,就可以将符合条件的资料送到 AWS SQS 了。

Wait

Wait 这个 Processor 其实就是一个暂停的 Processor。通常会是什麽时候会用他呢?

  1. 通常用於开发的阶段,因为 Date Pipeline 是由多个 Processor 所建构而成的,所以会需要一个一个 Processor 做设定与测试,所以会在需要做测试的 Processor 的下游接一个 Wait Processor。
  2. 另一个用法就是会用在不需要的资料,以这次的范例来说,我们就可以把不符合条件的 FlowFiles 先送到 Wait,主要是用来确定资料确实有依照我们的限制来做判断,来进一步地决定下游的流向。

小总结

上述就是我带给各位的第二个范例,这些看似简单的 Processor,其实都是很常用的,所以希望透过这两天的小实作那大家可以对於 NiFi 在做 Data Pipeline 的设定与流程可以有更多的体悟与理解。

那明天会介绍一个国外企业是如何使用 Apache NiFi 的小案例分享,以及他的架构是如何做的,对於未来要导入该 Tool 的企业或许有一定的帮助。

Reference


<<:  Proxmox VE 设定客体机高可用性

>>:  [Day27] 找回密码API、重设密码API – urls、测试阶段

[从0到1] C#小乳牛 练成基础程序逻辑 Day 2 - Visual Studio 2022 开发环境建立 64位元

VS 2022 Preview | 64位元 | Browser IDE 🐄点此填写今日份随堂测验 ...

ThinkPHP V5.1 新增控制器

还不会创建ThinkPHP V5.1专案的朋友们可以先去看看创建ThinkPHP V5.1专案。 何...

Day 15 - [语料库模型] 03-语料读取&资料格式转换

今天的主题是介绍如何读取 CSV (之前从各个网站爬下来的问答集),并将资料转成後面制作语料库模型要...

Day25 Vue 双向绑定 vs 单向绑定

什麽是单向绑定什麽是双向绑定?简单来说一个只有单方面的传送,另一个则是可以来回传,wow讲完了,今天...

Dungeon Mizarka 001

第一人称地城冒险游戏介绍 第一人称地城冒险游戏(FP Dungeon Crawler, FPDC)类...