今天要介绍如何用 NiFi 来对 MongoDB 的资料做操作。MongoDB 是我们最熟悉的 Document DB 的类型,他支援的 JSON, XML 等格式,他其实对於一些文档的资料储存有一定的好处,也是现今最常用的资料库之一,所以我认为介绍 MongoDB 如何整合於 NiFi 也是有一定帮助的。那我们就赶快开始吧
一样我们要先建立好 Mongodb 的 Container,这样才能做范例。这里我也先给各位一个 docker-compose.yaml
的范例:
version: '3'
services:
nifi:
image: nifi-sample
container_name: nifi-service
restart: always
ports:
- 8443:8443/tcp
- 8080:8080/tcp
env_file: .env
environment:
SINGLE_USER_CREDENTIALS_USERNAME: ${NIFI_USERNAME}
SINGLE_USER_CREDENTIALS_PASSWORD: ${NIFI_PASSWORD}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
AWS_REGION: ${AWS_REGION}
networks:
- nifi-network
nifi-registry:
image: apache/nifi-registry:1.14.0
container_name: nifi-registry-service
restart: always
ports:
- 18080:18080/tcp
networks:
- nifi-network
mysql:
image: mongo:4.2-rc-bionic
container_name: nifi-mongo
restart: always
ports:
- 27017:27017/tcp
env_file: .env
environment:
MONGO_INITDB_ROOT_USERNAME: ${DB_ADMIN_USERNAME}
MONGO_INITDB_ROOT_PASSWORD: ${DB_ADMIN_PASSWORD}
MONGO_INITDB_DATABASE: ${DB_TABLE_NAME}
networks:
- nifi-network
networks:
nifi-network:
一样执行 docker-compose up -d
即可建立完成。
相较於前一篇的 RDB 介绍,NiFi 本身就有提供原生的 Processor 可以让我们对 MongoDB 来做操作,所以整体来说会更加简单,目前 NiFi 有的 MongoDB Processor 包含如下:
其中 GetMongoRecord
和 PutMongoRecord
这两个 Processor 主要是要让写入或读取的资料不是以 JSON format 呈现,而是可以改成 CSV 等其他格式。通常除非有特别需求才会改用这两种 Processor 来对 MongoDB 做读写,不然通常还是以 GetMongo
和 PutMongo
来做处理。
这个很单纯地就是从 MongoDB 读取资料的 Processor,我们可以详细看一下其中几个重要的设定:
mongodb://[username]:[password]@172.17.0.1:27017
如此一来,当开始执行时,他就会从 Collection 开始逐一把 Record 读出来成 FlowFiles。
此外,我们可以看到 Properties 中第一个是 Client Service
,这边可以指定 MongoDB 的 Controller Service (MongoDBControllerService
),所以我们也可以事先建立好,参考如下图:
就可以在 MongoDBControllerService
设定 MongoURI,如此一来就可以在所有 MongoDB 相关 Processors 中的 Client Service
指定这个 Controller Service,就可以建立连线了。
该 Processor 就是写入或更新资料到 MongoDB,所以前提是 FlowFiles 当中必须要有 Content 内容,这样 NiFi 才会把当中的资料转成 JSON 且写入进入,一样我们来看一下内部的设定:
前面有提到的 Client Service
, Mongo Database Name
和 Mongo Collection Name
这边就不再提一次了,我们来看一下其他的设定。
insert
和 update
两种,可依据你的情境来做选定。所以透过 PutMongo
,我们就可以很轻松地对 MongoDB 的资料做新增与写入的动作了。
该 Processor 就是顾名思义去删除 MongoDB 的资料,所以理所当然他的 Processor 设定也是一样的,如下:
其中可以看到 Delete Mode
可以让我们去决定一次删除一笔还是多笔,所以可依据我们的场景来做选定。
该 Processor 主要是用来执行 MongoDB 的 aggregation command,可以参考官方 link,就可以大概知道 aggregation 是如何实作的。
一样我们来先看一下该 Processor 的设定:
一些基本的设定都跟前面一样,其中要注意的是 Query
这个参数。
举例来说,我们原生 MongoDB 会透过像是以下指令来做 aggregation:
db.orders.count( { ord_dt: { $gt: new Date('01/01/2012') } } )
对应到 RunMongoAggregation
则在 Query 这个参数就是填入
{ "ord_dt" : { "$gt": new Date('01/01/2012') } }
就是去除掉 function,将 key 补上双引号即可,就能做到同样的效果了。
今天对於 MongoDB 那边的操作就大概介绍到这边,原则上只要把 Processor 设定好,接着在 Start 你的 datapipeline,就会看到对应的 FlowFiles 的状况,所以我仍先以 Processor 本身需要注意的设定为主,到了系列文的最後面就会有一个情境来让大家更熟悉整体的操作流程。接下来,明天就会开始介绍到 AWS 相关服务的整合,这部分我觉得是目前实务案例中最重要的,大多数企业都会将服务建置上云端上,所以我尽可能地来把这块介绍完整,包含了 AWS, GCP,也再麻烦读者们好好地追踪!
>>: Day 19 UItableView的练习 (3/3)
打了2000字消失了怎麽办呢(´・_・`) 先去上个厕所压压惊,恳请IT邦邦忙快优化界面 在编辑介面...
这篇是 Thunkable学习笔记 2 - 加入Firebase登入功能(使用EMail) 的功能加...
由於未知原因昨天的模型一直无法训练 为了Debug,今天把它改成价格预测来和这篇对照 把Loss地方...
D4: 基本四则运算 基本的加减乘除,但是会发现除法结果只会显示整数的部分 所以我的解决办法是改成f...
这是我很喜欢的主题,但今天依旧不舒服而且我队友不安ㄌ只好先断尾求生 InnoDB InnoDB 是...