Day25 NiFi - 第三方程序执行

在过往的介绍篇幅当中,我们几乎都是透过 NiFi 原生的 Processor 来做处理,甚至整合一些外部的 JDBC,但除了这些方法之外,我们也可以透过我们熟悉的程序语言来做辅助,像是 python, java, golang 等来帮我们针对 FlowFiles 来做处理,今天会介绍一下如何在 NiFi 撰写这些程序语言的设计。

How to integrate program?

NiFi 要如何来撰写这些程序的 script 呢?主要有两个 Processor 可以做到,分别是 ExecuteScriptExecuteStreamCommand

ExecuteScript

这个 Processor 通常会有内建好以支援的程序,如下图:

我们可以在 Script Engine 看到有像是 Groovy, lua, python 等,像我自己是只用过 python 的 Script Engine 来做处理,所以我这边稍微提一下注意的地方。

当你如果选择 python 的话,要注意的是这边的程序撰写并不是纯 python,而是 jython。因为在一开始有介绍到 NiFi 是一个由 java 做为底层的开发语言,所以在这中间的转换如果需要写 code 来做处理的话,以 python 为例则需要以 jython 来做为撰写。

那 jython 其实不难,详情可以参考该连结,大致上的结构与逻辑与 python 差不多,但由於 jython 是由 java 语言所撰写的 python 直译器,所以会需要事先 import java 可能会用到的 lib,才能在 NiFi 顺利地执行下去。

然而指定完 Script Engine 之後,倘若你有事先写好的 script file,那就在 Script File 去指定 Script 的路径; 如果没有则在 Script Body 直接输入要执行的 code 内容,如下图:

这边贴一下网路的 jython 的 example code 上来:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import OutputStreamCallback
 
# Define a subclass of OutputStreamCallback for use in session.write()
class PyOutputStreamCallback(OutputStreamCallback):
  def __init__(self):
        pass
  def process(self, outputStream):
    outputStream.write(bytearray('Hello World!'.encode('utf-8')))
# end class
flowFile = session.get()
if(flowFile != None):
    flowFile = session.write(flowFile, PyOutputStreamCallback())
session.transfer(flowFile, REL_SUCCESS)

简单来说,就是将流进来该 Processor 的 FlowFiles 之 Content,多了 Hello World! 的内容,接着来流出去给下游 Processor。

所以我们可以发现在 import 的那一段会需要用到 java 底层的 lib 来作为辅助,然而取得 FlowFiles 的方式则需要透过 session.get() 来取得,最後再 transfer 到下游。

ExecuteStreamCommand

这时候你可能会觉得,但我就是想要用原生的 python 来撰写跟辅助,请问 NiFi 可以怎麽用呢?这时候就可以透过 ExecuteStreamCommand 这个 Processor 来做使用。

首先,我们先来看一下一个大致上的设定:

  • Command Path: 要执行的程序路径,ex. /usr/lib/python3
  • Command Arguments: 要带入到 script 的参数
  • Argument Delimiter:参数的分隔符号,上图采用空白键
  • Output Destination Attribute:传出来的结果要存到 FlowFiles 的哪一个 key

这边我们就可以事先写好真的原生的 python script,这部分就不需要写用 jython 来做处理,只要你在 Command Path 指定好 python 的执行路径,你就可以执行 python script。

其中比较特别的是 Output Destination Attribute,假设你在你的 python script 是以下的内容:

import os
import argparse

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--arg1', default=5, type=int)
    parser.add_argument('--arg2', default=1, type=int)
    args = parser.parse_args()
    
    count = args.arg1 + args.arg2
    
    print(count) ### it will be output as attribute
    
if __name__ == '__main__':
    main()

从这段 python code,我们可以看到就是一个很简单的传入两个 args,接着 print 出加总的结果。而假如我们在 Output Destination Attribute 设定为 output_res,则该 Processor 就会把 print 的结果设定为 output_res 这个 key 的 value,也就是说经过该 Processor 的所有 FlowFiles 都会多带有一个名为 output_res 的 attribute,其 value 就是在 script print 的资讯内容。

所以这个 Processor 的用法要留意一下,通常我的经验是用在我可能有一些复杂的逻辑是需要透过 python 来做处理,且处理後的结果只有一个,所以我在整个 script 内只会有一个地方有 print 的 code,我再把那个 print 的 value 导向到我设定的 attribute name,如此一来我下游的 Processor 就可以采用计算处理完的 value 去做接下来的 task。

小总结

这两个 Processor 是目前在 NiFi 中可以透过第三方的程序语言来做辅助的 Processor,我们会依照使用场境来选择适当的处理方式,例如如果你是需要用到 FlowFiles 本身的 Attributes 或 Content 来做应用的话,通常会比较建议采用 ExecuteScript 的 Processor,因为就可以透过 session.get() 来取得。

但如果只是很单纯额外增加 attribute 或是更新的话,其实透过 ExecuteStreamCommand 来处理就好了,因为我们可以对 attribute 来做操作。

以上是我对於这两个 Processor 的操作经验,或许有更好的用法也说不定,只是真的会用到的机会较少,但这边我也一样分享给大家这样的做法,倘若读者们有新的发现,也欢迎来一起讨论交流喔。

Reference


<<:  【Day 28】Self - defined Data Types

>>:  [从0到1] C#小乳牛 练成基础程序逻辑 Day 25 - String操纵术II

Day 28. 测试HTTP Status Code

使用SuperTest 使用SSR时,你要负责回应正确的HTTP Status Code。 因为牵涉...

D29 Selenium 自动注册帐户

研究了半天 写了一个自动注册帐户的程序 如下 首先会读取chromedriver 开启後转到注册页面...

[Day29] Bevy 游戏引擎 (Part 3) 收工

好 那今天就是专案的收尾了 我先预告一下明天会把一些我从开始学习 Rust 之後陆续得到的学习资源 ...

[Day04] Vue i18n - Pluralization

在本地化 (localize) 文字讯息时,我们可能会遇到某些语言会有复数型态的状况 (最常见的就是...

Node套件运用测试

因为我看到说node可以利用套件来使撰写程序较简单方便些,所以我这边就想用express套件来做个简...