Day9 NiFi - Controller Service

今天我要来介绍下一个对 NiFi 很重要的 Componenet - Controller Service。只要与第三方的平台、cloud 或 DB 等都需要透过该 Componenet 来做一个对接,才可以将我们要的资料来做一个输入或是输出。

什麽是 Controller Service?

如同一开始所提到的,有时候我们会需要从 Database、DatawareHouse 或是云端的服务来取得资料,进而透过我们在 NiFi 建立的 Data Pipeline 来做一连串的处理步骤,最後在写入。但是如果同时有多个 Data Pipieline 都需要对同一个 DB 或是一个 Datasource 建立多个连线的话,则对於另一端可能会造成不可预期的影响。所以若能透过同一一个 Object 来建立其中的连线的话,对於 Pipeline 的处理一方面更加单纯,另一方面对於提供资料的那一端也不会有太多不必要且重复的网路连线。

这边我帮各位 Controller Service 底下大概的特性与分类:

  1. 与 DB、Cloud(ex. AWS) 建立连线的设定
  • AWSCredentialsProviderControllerServcie
    可让使用者设定好 access_key_id, secret_access_key 来存取对应的 AWS 资源
  • DBConnectionPool
    使用者可以指定好 DB 的 JDBC Driver 之路径classConnection URL,就可以让 NiFi 对於 DB 来做存取。ex. MySQL, AWS Athena, AWS Redshift等

当然还有很多其他原生提供的 Controller Service,ex. CassandraSessionProvider, RedisConnectionPoolService, HiveConnectionPool, GCPCredentialsControllerService等这些都是比较常用到的,而运作原理也跟我上述的描述雷同。

  1. 读取或写入特定 format 的设定
    Controller Service 的另外一种比较常见的用法是针对特定档案格式的 Reader 和 Writer,让我们可以去利用对应的档案格式读取或写入档案:
  • Reader
    常见的有 AvroReader, CsvReader, JsonTreeReader, ParquetReader, XMLReader
  • Writer
    常见的有 AvroRecordSetWriter, CSVRecordSetWriter, JsonRecordSetWriter, ParquetRecordSetWriter, XMLRecordSetWriter

如此一来 NiFi 就可以根据不同的档案格式去解析与写入对应的资料档案。

如何操作?

接下来稍微介绍一下如何去设定 Controller Service,我们可以在主页的左方的『齿轮符号』:

接者选择 『CONTROLLER SERVICES』,且点选右手边的『+』符号:

接着就会跳出所有 Controller Service的种类,与 Processors 的选择画面雷同,我们就可以在上面选择我们要的 Controller Service:

接下来我分两个场景来简单介绍一下如何使用:

  1. AWSCredentialsProviderControllerServcie
    这部分都是采用 AWS 相关 Processor 的时候会去用到的,当我们选择这个 Controller Service 的时候,我们会在刚刚的画面看到如下结果:

光这样可是还不够的,接着我们点选右手边『齿轮』的符号,会看到如下画面:

我们会看到一些设定,最基本的我们只要去设定好 Access Key IDSecret Access Key 就可以,其中你会发现当设定完之後 Value 会变成 Sensitive Value set:

那是因为 NiFi 本身会针对一些敏感资料的属性去做隐藏以及加密,以避免被其他使用者知道真实 value 内容。

当设定完之後按下『APPLY』,我们就可以回到画面按下『闪电』的符号来作为启用:

当按下『闪电』的符号时,会看到两种设定,一个是『Service Only』,代表只单纯启用这个 Controller Service; 另一种是『Service and referencing component』,这是除了启用之外,也会把相关应用到的 Processor 状态也 start 起来。

这边我们选择最一般的『Service Only』,就会看到 State 栏位从原本的 Disabled 变成 Enabled:

最後,我们任意挑选一个关於 AWS 的 Processor,这边以『ListS3』为例:

我们会发现红框也是做同样的设定,如果每次都要设定 Access Key ID 等资讯就会相对麻烦,而且也会建立很多连线,此时我们可以在橘框选择对应刚刚设定好的 Controller Service 即可,如此一来就能做使用了。

  1. CSVReader & CSVRecordSetWriter
    这边以读取 CSV 的读取和写入作为范例,其实在上一篇的 Processor Group 就有稍微带到,但这边来做更进一步地说明,一样我们加入这两个 Controller Service:

接着可以来稍微看一下内部的设定,首先是 CSVReader:

里面是一些比较重要的设定,通常我是不太会动它,除非有像是分隔符号改变、schema 改变等才会去动到他。
这边我也来提一下每一个 Value 所代表的意思:

* Schema Access Strategy: Infer Schema
代表使用档案的栏位名称与资料格式作为 Schema
* Value Separator: 分隔符号(default: `,`)
* Record Separator: Record 的分隔符号 (default: `\n`)
* `Treat First line as Header`: 是否要将第一行视为 Header
* Quote Character: 用双引号来包字串
* Escape Character: 对特定符号解析的 character

CSVRecordSetWriter 也是类似的设定:

接着我们一样启用他,那 State 变成 Enabled:

最後我们拿 『SplitRecord』 来做范例,我们可以看到底下就可以设定这两个 Controller Service,代表 FlowFiles 会以 CSV 的格式来读取且再经由 CSV 的格式转换写入:

小总结

其他相关的用法其实都是大同小异,通常我也是根据我的应用场境来决定适用的 Controller Service,再搭配文件的说明来做适当的设定,只是若能擅长用 Controller Service,对於 Team 来说其实也比较好控管这个对接第三方的 Connection 设定,在後续的场境也会用到其他 Controller Service,期望读者们能先透过这篇有一个基本的认识与操作。

明天要来讲 - Templates, Lables & Funnel,很快就要第10天了,我们再一起努力一下,一定收获良多的!

Reference


<<:  关於多型

>>:  如果你竭尽全力,就不会有时间担心失败。

Binary Search

二元搜寻BigO(log n) 相较於线性搜寻时间复杂度实在好太多 必须是被排序好的 由於每次对半砍...

[Lesson28] Kotlin - Generics

泛型就是参数化类型,将类别参数化。让你在定义类别、方法、介面时先不用决定型别,等到要实体化时再决定型...

二元树左到右查找 - DAY 16

前序检查(preorder) 中序检查(inorder) 後序检查(postorder) 後序检查来...

第12天 - 用 PHP Session 与 Bootstrap 做警告(提示)

今天用来使用 Session 做警告(提示),它可以用来让使用者知道他的动作使否有成功(例如修改成功...

连续 30 天 玩玩看 ProtoPie - Day 6

终於要从 Beginner 迈向 Intermediate 了。 这次的讲者讲话好清楚,转 1.75...