今天开始会有两天来介绍简单的应用场景,会建置完整的 NiFi Data Pipeline 来让大家知道过程中的流程。首先第一天,先来个简单的场景:
假设有个 csv 档案在 AWS S3,我们希望从 s3 下载下来经过简单的处理,接着以 parquet 的方式储存在 local 的路径上。
DigiDB_digimonlist.csv
即可 ,且上传到 s3 指定好的路径做存放。我们下载完资料且上传到 s3 之後,接下来就要先设想我们要执行的资料前处理,这里先简单列给各位一个范例,那各位读者们可以再从这里去做更多的延伸:
Stage
栏位转换成数值先简单以上这两个处理就好,处理完接着储存到 local 的某一个路径上,再以 parquet 格式储存。
这个范例大致上的 Flow 会长得如下图:
接下来来带各位来看一下每一个 Processor 的设定。
这边记得指定好你的 Bucket, Region, Prefix 和 AWS Crendentials Provider service 的参数
设定好之後就会列出你指定的 Prefix folder 下的档案。
接着透过 FetchS3Object 来取得真实的档案内容。
这个 Processor 是可以让我们透过 SQL 的方式来取得要的资料和转换,先看下图设定:
可以来看一下底下的 SQL:
select Number, Digimon,
CASE WHEN Stage='Baby' THEN '0'
WHEN Stage='In-Training' THEN '1'
WHEN Stage='Rookie' THEN '2'
WHEN Stage='Champion' THEN '3'
WHEN Stage='Ultimate' THEN '4'
WHEN Stage='Mega' THEN '5'
WHEN Stage='Ultra' THEN '6'
WHEN Stage='Armor' THEN '7'
end Stage, Type
from FLowfile
简单来说,我从 FLowfile 取得我要的四个栏位,接着对於 Stage
这个栏位去做一个转换。
然而,我下游的 Processor 这必须要用 statement 作为 connection,如此一来处理过後的资料才能流到下游 Processor。
这边是我用来更新 filename
这个 attribute 的 value,因为下一个 PutFile 会预设去读这个 attributes 去作为写入档案的命名,这边以 result.parquet
。
透过这个 Processor 就可以指定写入的路径,这边以 /tmp/data/
这个 folder 为例,注意必须事先有该 folder 存在,否则会写入错误。
如此一来,就会将处理完的资料写成 local 的 /tmp/data/result.parquet
即可完成。
最後进到 container 的 terminal,就可以看到确实有产生对应的 parquet file。
接着,我们透过 Processor 的 View data Provenance,就可以看到处理完後的 Content 的状况:
以上是第一个小范例的操作,很简单地5个 Processors 就可以达到我们想要的效果,明天会再利用这个 file 再做一下步地延伸与范例。
>>: 冒险村26 - Design Pattern(6) - Form Object
什麽是 Periodic Background Sync API 透过在 service worke...
常用的标签 先来介绍一些常用的标签 h1、h2、h3、h4、h5、h6 ← 这六个是依序由大到小的标...
Chap.I 理论基础 Part 2:微积分 4. Critical Points and Opti...
问题: 照着https://wiki.debian.org/QEMU 的教学 输入这两行指令没问题 ...
Sum of a sequence our task is to make function, wh...