前面我们已经介绍完 FlowFiles 了,接下来就是可以一步一步地去建置我们的 Data Pipeline,今天的主角是『Processors』。
在 Day2 的时候,就有提到 Processor 这个名词,它同时也是撑起在 NiFi 的 Data Pipeline 的重要 componenet 之一。根据官方文件的左手边可以看到『Processors』底下的种类非常非常多,如果这次的系列文30天都全部介绍的话,还不一定讲得完。
所以这里我先帮各位读者们去整理一份归类,先有一个大概的认识:
Processor Categories | Intorduction | Example |
---|---|---|
Data Ingest Processors | 主要就是用来取得资料的 Processors,可能透过 files, ftp, db 等方式 | GenerateFlowFiles, ListFiles, GetFile, GetKafka, GetMongo, GetTwitter... |
Data Transform Processors | 主要就是在做一些 transform, replact, update等操作的 Processors | ReplaceText, ConvertRecord, UpdateRecord, ConvertJsonToSQL... |
Data Egress & Sending Data Processors | 主要 ouput 或 传送资料到下一个 Destination 的 Processors | PutSlack, PutEmail, PutFile, PutFTP, PutKafka.. |
Routing & Mediation Processors | 以控制 FlowFiles 该往哪一个 connection 流向的 Processors | RouteOnAttribute, RouteOnContent... |
DB Access Processors | 以 Access DB 操作的 Processors(主要偏向可SQL指令的DB) | PutSQL, ExecuteSQL, ListDatabaseTables, QueryDatabaseTable... |
Attribute Extraction Processors | 以 FlowFiles 的 attributes 操作的 Processors | EvaluateJsonPath, EvaluateXPath, ExtractText, UpdateAttribute, LogAttribute... |
Execute Programming Processors | 执行第三方程序(ex. python, go, etc.) 操作的 Processors | ExecuteProcess, ExecuteStreamCommand... |
Splitting & Aggregation Processors | 切分资料或计算处理的 Processors | SplitText, SplitJson, SplitRecord, MergeContent.... |
HTTP & UDP Processors | 透过 HTTP 或 UDP 做资料操作的 Processors,通常用於 API 呼叫居多 | GetHTTP, ListentHTTP, PostHTTP, PutHTTP, PutUDP... |
AWS Processors | AWS 相关操作的 Processors | ListS3, FetchS3Object, PutS3Object, PutSNS, GetSNS, PutSQS, PutDynamoDB, PutLambda... |
GCP Processors | GCP 相关操作的 Processors | ListGCSBucket, PutGCSObject, FetchGCSObject, DeleteGCSObject, ConsumeGCPPubSub, PublishGCPPubSub, PutBigQueryBatch, PutBigQueryStreaming... |
我想各位读者看到这些应该就已经头昏眼花了,更何况在这次的系列文当中更不可能全部提到,所以我又再简化了一些重点,读者们就可以透过我的重点即可推敲出大多出 Processor 的用途了:
GetXXX
通常代表着『取得』资料转成 FlowFiles的意思,ex. GetFile,从 Local file 取得资料; GetMongo 就是从 MongoDB 取得资料。
PutXXX:
通常代表着『写入』、『传送』的意思,ex. PusSlack 传送资讯到 slack; PutSQS 将资料传送到 AWS SQS; PutKafka 将资料传送到 Kafka。
FetchXXX:
通常代表『取得资料内容』,这通常较常出现在 File 类型,ex. FetchFile, FetchS3Object, FetchGCSObject。你可能会问这和『GetXXX』有什麽差异呢? 以 GetFile 是用来取得 File 的 size, permission, last modified等资讯,而 FetchFile 而是真实将档案内容以 一个row 的方式转成一个 FlowFiles,但只有 File 有这样的差别。
ListXXX:
通常会列出我们指定的 Folder 底下有哪些 Files,并将 Path 回传出来, ex. ListFiles 回传本机端的某一个 Folder 底下全部档案; ListS3 则回传某一个 AWS S3 Bucket 下的 folder下的所有档案等。
DeleteXXX:
就是很简单的删除资料,ex. DeleteGCSObject 就是删除 GCS 的档案; DeleteSQS 就是删除 SQS等。
RouteOnXXX:
用来决定 Flowfiles 留下的条件,ex. RouteOnAttributes 代表依据 Flowfiles 某一个 attribute 的 value 来决定接下来要走哪一个 Connection。
QueryXXX:
就是对 DB 做资料查询, ex. QueryCassandra 对 Cassandra 做查询; QueryDatabaseTable 对 DB 做查询,但需要搭配 JDBC Driver,如果要对 MySQL 就要指定 MySQL Driver。
ExecuteXXX:
执行 DB Query 或执行 Programing, ex. ExecuteSQL 对 DB 执行 SQL 指令; ExecuteStreamCommand 可用来执行第三方的程序语言等
你会发现,其实在 NiFi 的 Processor 命名其实还蛮单纯的,通常就是一个『动作』搭配一个『目的』,所以只要掌握这些原则,其实大多数的 Processor 都能大概知道他的用途,剩下例外的就在查 Document 就可以了。
那接下来就来了解一下如何操作 Processor 吧!
从上面的 gif 可以看到我们是如何将 Processor 拉到主画面的操作,这里我也用截图的方式来呈现一次
在画面最左方可以看到 『Processor』的图示,拖拉到主画面
接着就会呈现所有 Processor 的视窗,根据你的选择点选你要的 Processor,且点下 Add 就会有你选择的 Processor 了
透过以上简单两的步骤,就可加入我们要的 Processor,但其实 Processor 还是有很多细节,我们继续看下去
我们可以在 Processor 点击两下就可以看到更细节的设定,就会呈现如下画面:
SETTINGS
这个主要会包含几个部分,Processor 命名、Processor Type等,其中最重要的是 Automatically Terminate Relationships
,这部分就会得知出该 Processor 可与下游 Processor 建立哪些 Connection,但并不是所有 Connection 都一定要建立,你可以在这边的设定勾选起来,就代表 terminate,後续 NiFi 也就不会要求你一定要建立这个 Connection 了
SCHEDULE
这里就是要设定 Schedule 的排程,在 NiFi 有分成两种方式:
Run Schedule
,直接指定单位,ex. 1 hour, 10 minute 就分别代表每一小时、每10分钟 triggerㄡRun Schedule
,像 linux 一样的去设定排成时间,ex. 0 0 13 * * ?
代表安排在每天下午1:00运行。除了以上设定之外,还有以下的设定可以注意:
PROPERTIES
接下来这个部分应该是 Processor 最重要且最复杂的地方,也就是 Processor 的设定,每一个不同 Type 的 Processor 会有不同的 properties 要去做设定,其中黑色粗体的就是必要设定的 key,有些 Processor 的 properties key 很多很复杂,所以通常就要搭配着 Document 来去做确认跟设定对的 value。未来我这边套用场境时会挑选简单的 Processor 来让大家熟悉,随着操作熟悉之後,其实在设定上也会变得平易近人。
COMMENT
这部分就是来针对该 Processor 做一些简单的说明和注解,帮助你的团队成员或是後续你来检视的时候,就能知道当初为什麽要采用这个 Processor 的原因和目的。
我想经过今天的简介与简单的操作,想必读者们对於如何使用 Processor 有基本的认识了吧!还没很熟悉的人没关系,不要急慢慢来。我相信你一定是需要套用真实的场景和案例,你就会比较知道如何上手了,再让我们等一下,待我在花个几天把全部 Componenet 操作个别都介绍完之後,我们就可以全部串在一起做使用了,到时候相信你一定可以使用得很得心应手!
DAY8 MongoDB 批次操作(bulk wirte) 与 Operators bulk wri...
今天我们简单来整理一下,这几个星期从 《 Patterns of Enterprise Applic...
既然使用云端服务,首要处理的问题就是现有的服务、web的调校怎麽办?是不是需要重新在云端上架设呢?...
前情提要 上回说了希望选择学习火属性魔法後,艾草带我走到一棵大树下。 艾草:「来尝试用自己现有的魔力...
OS模组(Python内建) 说明 : os模组是一种与作业系统相关的模组,提供数十种与作业系统沟通...