Day6 NiFi - Processors

前面我们已经介绍完 FlowFiles 了,接下来就是可以一步一步地去建置我们的 Data Pipeline,今天的主角是『Processors』。

Processors Category

在 Day2 的时候,就有提到 Processor 这个名词,它同时也是撑起在 NiFi 的 Data Pipeline 的重要 componenet 之一。根据官方文件的左手边可以看到『Processors』底下的种类非常非常多,如果这次的系列文30天都全部介绍的话,还不一定讲得完。

所以这里我先帮各位读者们去整理一份归类,先有一个大概的认识:

Processor Categories Intorduction Example
Data Ingest Processors 主要就是用来取得资料的 Processors,可能透过 files, ftp, db 等方式 GenerateFlowFiles, ListFiles, GetFile, GetKafka, GetMongo, GetTwitter...
Data Transform Processors 主要就是在做一些 transform, replact, update等操作的 Processors ReplaceText, ConvertRecord, UpdateRecord, ConvertJsonToSQL...
Data Egress & Sending Data Processors 主要 ouput 或 传送资料到下一个 Destination 的 Processors PutSlack, PutEmail, PutFile, PutFTP, PutKafka..
Routing & Mediation Processors 以控制 FlowFiles 该往哪一个 connection 流向的 Processors RouteOnAttribute, RouteOnContent...
DB Access Processors 以 Access DB 操作的 Processors(主要偏向可SQL指令的DB) PutSQL, ExecuteSQL, ListDatabaseTables, QueryDatabaseTable...
Attribute Extraction Processors 以 FlowFiles 的 attributes 操作的 Processors EvaluateJsonPath, EvaluateXPath, ExtractText, UpdateAttribute, LogAttribute...
Execute Programming Processors 执行第三方程序(ex. python, go, etc.) 操作的 Processors ExecuteProcess, ExecuteStreamCommand...
Splitting & Aggregation Processors 切分资料或计算处理的 Processors SplitText, SplitJson, SplitRecord, MergeContent....
HTTP & UDP Processors 透过 HTTP 或 UDP 做资料操作的 Processors,通常用於 API 呼叫居多 GetHTTP, ListentHTTP, PostHTTP, PutHTTP, PutUDP...
AWS Processors AWS 相关操作的 Processors ListS3, FetchS3Object, PutS3Object, PutSNS, GetSNS, PutSQS, PutDynamoDB, PutLambda...
GCP Processors GCP 相关操作的 Processors ListGCSBucket, PutGCSObject, FetchGCSObject, DeleteGCSObject, ConsumeGCPPubSub, PublishGCPPubSub, PutBigQueryBatch, PutBigQueryStreaming...

我想各位读者看到这些应该就已经头昏眼花了,更何况在这次的系列文当中更不可能全部提到,所以我又再简化了一些重点,读者们就可以透过我的重点即可推敲出大多出 Processor 的用途了:

  1. GetXXX
    通常代表着『取得』资料转成 FlowFiles的意思,ex. GetFile,从 Local file 取得资料; GetMongo 就是从 MongoDB 取得资料。

  2. PutXXX:
    通常代表着『写入』、『传送』的意思,ex. PusSlack 传送资讯到 slack; PutSQS 将资料传送到 AWS SQS; PutKafka 将资料传送到 Kafka。

  3. FetchXXX:
    通常代表『取得资料内容』,这通常较常出现在 File 类型,ex. FetchFile, FetchS3Object, FetchGCSObject。你可能会问这和『GetXXX』有什麽差异呢? 以 GetFile 是用来取得 File 的 size, permission, last modified等资讯,而 FetchFile 而是真实将档案内容以 一个row 的方式转成一个 FlowFiles,但只有 File 有这样的差别。

  4. ListXXX:
    通常会列出我们指定的 Folder 底下有哪些 Files,并将 Path 回传出来, ex. ListFiles 回传本机端的某一个 Folder 底下全部档案; ListS3 则回传某一个 AWS S3 Bucket 下的 folder下的所有档案等。

  5. DeleteXXX:
    就是很简单的删除资料,ex. DeleteGCSObject 就是删除 GCS 的档案; DeleteSQS 就是删除 SQS等。

  6. RouteOnXXX:
    用来决定 Flowfiles 留下的条件,ex. RouteOnAttributes 代表依据 Flowfiles 某一个 attribute 的 value 来决定接下来要走哪一个 Connection。

  7. QueryXXX:
    就是对 DB 做资料查询, ex. QueryCassandra 对 Cassandra 做查询; QueryDatabaseTable 对 DB 做查询,但需要搭配 JDBC Driver,如果要对 MySQL 就要指定 MySQL Driver。

  8. ExecuteXXX:
    执行 DB Query 或执行 Programing, ex. ExecuteSQL 对 DB 执行 SQL 指令; ExecuteStreamCommand 可用来执行第三方的程序语言等

你会发现,其实在 NiFi 的 Processor 命名其实还蛮单纯的,通常就是一个『动作』搭配一个『目的』,所以只要掌握这些原则,其实大多数的 Processor 都能大概知道他的用途,剩下例外的就在查 Document 就可以了。

那接下来就来了解一下如何操作 Processor 吧!

How to use Processors?

如何加入 Processors


从上面的 gif 可以看到我们是如何将 Processor 拉到主画面的操作,这里我也用截图的方式来呈现一次

  1. 在画面最左方可以看到 『Processor』的图示,拖拉到主画面

  2. 接着就会呈现所有 Processor 的视窗,根据你的选择点选你要的 Processor,且点下 Add 就会有你选择的 Processor 了

透过以上简单两的步骤,就可加入我们要的 Processor,但其实 Processor 还是有很多细节,我们继续看下去

Processors Config 细节

我们可以在 Processor 点击两下就可以看到更细节的设定,就会呈现如下画面:

  1. SETTINGS

    这个主要会包含几个部分,Processor 命名、Processor Type等,其中最重要的是 Automatically Terminate Relationships,这部分就会得知出该 Processor 可与下游 Processor 建立哪些 Connection,但并不是所有 Connection 都一定要建立,你可以在这边的设定勾选起来,就代表 terminate,後续 NiFi 也就不会要求你一定要建立这个 Connection 了

  2. SCHEDULE

    这里就是要设定 Schedule 的排程,在 NiFi 有分成两种方式:

  • Timer Driven
    搭配 Run Schedule,直接指定单位,ex. 1 hour, 10 minute 就分别代表每一小时、每10分钟 triggerㄡ
  • Crontab Driven
    搭配 Run Schedule,像 linux 一样的去设定排成时间,ex. 0 0 13 * * ? 代表安排在每天下午1:00运行。

除了以上设定之外,还有以下的设定可以注意:

  • Concurrent Tasks
    可以控制 Processor 将使用的 Concurrent 数量。简单来说,用来决定 Processor 可同时处理多少个 FlowFiles,增加该设定能使 Processor 同时 处理更多数据。
  1. PROPERTIES

    接下来这个部分应该是 Processor 最重要且最复杂的地方,也就是 Processor 的设定,每一个不同 Type 的 Processor 会有不同的 properties 要去做设定,其中黑色粗体的就是必要设定的 key,有些 Processor 的 properties key 很多很复杂,所以通常就要搭配着 Document 来去做确认跟设定对的 value。未来我这边套用场境时会挑选简单的 Processor 来让大家熟悉,随着操作熟悉之後,其实在设定上也会变得平易近人。

  2. COMMENT

    这部分就是来针对该 Processor 做一些简单的说明和注解,帮助你的团队成员或是後续你来检视的时候,就能知道当初为什麽要采用这个 Processor 的原因和目的。

小总结

我想经过今天的简介与简单的操作,想必读者们对於如何使用 Processor 有基本的认识了吧!还没很熟悉的人没关系,不要急慢慢来。我相信你一定是需要套用真实的场景和案例,你就会比较知道如何上手了,再让我们等一下,待我在花个几天把全部 Componenet 操作个别都介绍完之後,我们就可以全部串在一起做使用了,到时候相信你一定可以使用得很得心应手!

Reference


<<:  # Day6--一个很难驾驭的概念:闭包

>>:  【Day 06】C 的资料型态(下)

DAY8 MongoDB 批次操作(bulk wirte) 与 Operators

DAY8 MongoDB 批次操作(bulk wirte) 与 Operators bulk wri...

30-23 之 Patterns of Enterprise Application Architecture 小总结

今天我们简单来整理一下,这几个星期从 《 Patterns of Enterprise Applic...

Day 6 阿里云架设网站-迁移上云端

既然使用云端服务,首要处理的问题就是现有的服务、web的调校怎麽办?是不是需要重新在云端上架设呢?...

入门魔法 - 针对 DOM 节点的简单操作

前情提要 上回说了希望选择学习火属性魔法後,艾草带我走到一棵大树下。 艾草:「来尝试用自己现有的魔力...

【Day 10】os模组

OS模组(Python内建) 说明 : os模组是一种与作业系统相关的模组,提供数十种与作业系统沟通...