Day26 NiFi 场景应用范例 (一)

今天开始会有两天来介绍简单的应用场景,会建置完整的 NiFi Data Pipeline 来让大家知道过程中的流程。首先第一天,先来个简单的场景:

假设有个 csv 档案在 AWS S3,我们希望从 s3 下载下来经过简单的处理,接着以 parquet 的方式储存在 local 的路径上。

事前准备

  1. 我们先下载一个 kaggle - Digimon Database: A database of Digimon and their moves from Digimon Story CyberSleuth 的资料,只要 DigiDB_digimonlist.csv 即可 ,且上传到 s3 指定好的路径做存放。
  2. 设定好 AWS Credential 的 Controller Service,好让後续的 Processor 可以做使用,详细设定参考 Day9 NiFi - Controller Service

What is data preprocessing about example data?

我们下载完资料且上传到 s3 之後,接下来就要先设想我们要执行的资料前处理,这里先简单列给各位一个范例,那各位读者们可以再从这里去做更多的延伸:

  1. Stage 栏位转换成数值
  2. 只保留 Number, Digimon, Stage, Type 四个栏位

先简单以上这两个处理就好,处理完接着储存到 local 的某一个路径上,再以 parquet 格式储存。

How to build?

这个范例大致上的 Flow 会长得如下图:

接下来来带各位来看一下每一个 Processor 的设定。

ListS3


这边记得指定好你的 Bucket, Region, PrefixAWS Crendentials Provider service 的参数

设定好之後就会列出你指定的 Prefix folder 下的档案。

FetchS3Object


接着透过 FetchS3Object 来取得真实的档案内容。

QueryRecord

这个 Processor 是可以让我们透过 SQL 的方式来取得要的资料和转换,先看下图设定:

  • Record Reader: 代表要用什麽格式读取,这边我选择用 CSVReader。
  • Record Writer: 代表从该 Processor 流出去的 Content 会是什麽格式,这边依据场景假设选用 ParquetRecordSerWritter。
  • statement: 这边是我新加入的 parameters,就是我要处理的 SQL

可以来看一下底下的 SQL:

select Number, Digimon,
    CASE WHEN Stage='Baby' THEN '0'
         WHEN Stage='In-Training' THEN '1'
         WHEN Stage='Rookie' THEN '2'
         WHEN Stage='Champion' THEN '3'
         WHEN Stage='Ultimate' THEN '4'
         WHEN Stage='Mega' THEN '5'
         WHEN Stage='Ultra' THEN '6'
         WHEN Stage='Armor' THEN '7'
    end Stage, Type
from FLowfile

简单来说,我从 FLowfile 取得我要的四个栏位,接着对於 Stage 这个栏位去做一个转换。

然而,我下游的 Processor 这必须要用 statement 作为 connection,如此一来处理过後的资料才能流到下游 Processor。

UpdateAttribute


这边是我用来更新 filename 这个 attribute 的 value,因为下一个 PutFile 会预设去读这个 attributes 去作为写入档案的命名,这边以 result.parquet

PutFile


透过这个 Processor 就可以指定写入的路径,这边以 /tmp/data/ 这个 folder 为例,注意必须事先有该 folder 存在,否则会写入错误。

如此一来,就会将处理完的资料写成 local 的 /tmp/data/result.parquet 即可完成。

Running Result

最後进到 container 的 terminal,就可以看到确实有产生对应的 parquet file。

接着,我们透过 Processor 的 View data Provenance,就可以看到处理完後的 Content 的状况:

小总结

以上是第一个小范例的操作,很简单地5个 Processors 就可以达到我们想要的效果,明天会再利用这个 file 再做一下步地延伸与范例。

Reference


<<:  表单处理 Object 里的 Object

>>:  冒险村26 - Design Pattern(6) - Form Object

Progressive Web App 定期背景同步 (19)

什麽是 Periodic Background Sync API 透过在 service worke...

Day04常用的基本标签(HTML)

常用的标签 先来介绍一些常用的标签 h1、h2、h3、h4、h5、h6 ← 这六个是依序由大到小的标...

Python 演算法 Day 4 - 理论基础 微积分

Chap.I 理论基础 Part 2:微积分 4. Critical Points and Opti...

解决QEMU:Failed to open module: ........的问题

问题: 照着https://wiki.debian.org/QEMU 的教学 输入这两行指令没问题 ...

[Kata] Clojure - Day 28

Sum of a sequence our task is to make function, wh...