Day16 NiFi - 与 MongoDB 对接设定

今天要介绍如何用 NiFi 来对 MongoDB 的资料做操作。MongoDB 是我们最熟悉的 Document DB 的类型,他支援的 JSON, XML 等格式,他其实对於一些文档的资料储存有一定的好处,也是现今最常用的资料库之一,所以我认为介绍 MongoDB 如何整合於 NiFi 也是有一定帮助的。那我们就赶快开始吧

Build MongoDB Container

一样我们要先建立好 Mongodb 的 Container,这样才能做范例。这里我也先给各位一个 docker-compose.yaml 的范例:

version: '3'

services:
    nifi:
        image: nifi-sample
        container_name: nifi-service
        restart: always
        ports:
            - 8443:8443/tcp
            - 8080:8080/tcp
        env_file: .env
        environment:
            SINGLE_USER_CREDENTIALS_USERNAME: ${NIFI_USERNAME}
            SINGLE_USER_CREDENTIALS_PASSWORD: ${NIFI_PASSWORD}
            AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
            AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
            AWS_REGION: ${AWS_REGION}
        networks:
            - nifi-network

    nifi-registry:
        image: apache/nifi-registry:1.14.0
        container_name: nifi-registry-service
        restart: always
        ports:
            - 18080:18080/tcp
        networks:
            - nifi-network
    
    mysql:
        image: mongo:4.2-rc-bionic
        container_name: nifi-mongo
        restart: always
        ports:
            - 27017:27017/tcp
        env_file: .env
        environment:
            MONGO_INITDB_ROOT_USERNAME: ${DB_ADMIN_USERNAME}
            MONGO_INITDB_ROOT_PASSWORD: ${DB_ADMIN_PASSWORD}
            MONGO_INITDB_DATABASE: ${DB_TABLE_NAME}
        networks:
            - nifi-network
networks:
    nifi-network:

一样执行 docker-compose up -d 即可建立完成。

How to use?

相较於前一篇的 RDB 介绍,NiFi 本身就有提供原生的 Processor 可以让我们对 MongoDB 来做操作,所以整体来说会更加简单,目前 NiFi 有的 MongoDB Processor 包含如下:

  • GetMongo
  • GetMongoRecord
  • PutMongo
  • PutMongoRecord
  • RunMongoAggregation
  • DeleteMongo

其中 GetMongoRecordPutMongoRecord 这两个 Processor 主要是要让写入或读取的资料不是以 JSON format 呈现,而是可以改成 CSV 等其他格式。通常除非有特别需求才会改用这两种 Processor 来对 MongoDB 做读写,不然通常还是以 GetMongoPutMongo 来做处理。

GetMongo

这个很单纯地就是从 MongoDB 读取资料的 Processor,我们可以详细看一下其中几个重要的设定:

  • Mongo URI:
    这边就是输入 Mongo 原生的 URI,官方有提供相关的 URI format,可点该 link,这边的范例是 mongodb://[username]:[password]@172.17.0.1:27017
  • Mongo Database Name:
    一样输入 Database 名称。
  • Mongo Collection Name:
    一样输入 Collection 名称。

如此一来,当开始执行时,他就会从 Collection 开始逐一把 Record 读出来成 FlowFiles。

此外,我们可以看到 Properties 中第一个是 Client Service,这边可以指定 MongoDB 的 Controller Service (MongoDBControllerService),所以我们也可以事先建立好,参考如下图:


就可以在 MongoDBControllerService 设定 MongoURI,如此一来就可以在所有 MongoDB 相关 Processors 中的 Client Service 指定这个 Controller Service,就可以建立连线了。

PutMongo

该 Processor 就是写入或更新资料到 MongoDB,所以前提是 FlowFiles 当中必须要有 Content 内容,这样 NiFi 才会把当中的资料转成 JSON 且写入进入,一样我们来看一下内部的设定:

前面有提到的 Client Service, Mongo Database NameMongo Collection Name 这边就不再提一次了,我们来看一下其他的设定。

  • Mode:
    目前支援 insertupdate 两种,可依据你的情境来做选定。
  • Upsert:
    简单来说是否开启这功能,若开启他会自动侦测如果资料不存在就写入,有存在就更新。
  • Update Query Key:
    用来根据某一个 key 来做更新。
  • Update Query:
    针对全部的 Record 来做更新,没有依据某一个 Key 的条件。

所以透过 PutMongo,我们就可以很轻松地对 MongoDB 的资料做新增与写入的动作了。

DeleteMongo

该 Processor 就是顾名思义去删除 MongoDB 的资料,所以理所当然他的 Processor 设定也是一样的,如下:

其中可以看到 Delete Mode 可以让我们去决定一次删除一笔还是多笔,所以可依据我们的场景来做选定。

RunMongoAggregation

该 Processor 主要是用来执行 MongoDB 的 aggregation command,可以参考官方 link,就可以大概知道 aggregation 是如何实作的。

一样我们来先看一下该 Processor 的设定:

一些基本的设定都跟前面一样,其中要注意的是 Query 这个参数。
举例来说,我们原生 MongoDB 会透过像是以下指令来做 aggregation:

db.orders.count( { ord_dt: { $gt: new Date('01/01/2012') } } )

对应到 RunMongoAggregation 则在 Query 这个参数就是填入

{ "ord_dt" : { "$gt": new Date('01/01/2012') } }

就是去除掉 function,将 key 补上双引号即可,就能做到同样的效果了。

小总结

今天对於 MongoDB 那边的操作就大概介绍到这边,原则上只要把 Processor 设定好,接着在 Start 你的 datapipeline,就会看到对应的 FlowFiles 的状况,所以我仍先以 Processor 本身需要注意的设定为主,到了系列文的最後面就会有一个情境来让大家更熟悉整体的操作流程。接下来,明天就会开始介绍到 AWS 相关服务的整合,这部分我觉得是目前实务案例中最重要的,大多数企业都会将服务建置上云端上,所以我尽可能地来把这块介绍完整,包含了 AWS, GCP,也再麻烦读者们好好地追踪!

Reference


<<:  Day 16 - Spring Boot 资料验证

>>:  Day 19 UItableView的练习 (3/3)

Chapter3 - 动感DJ续篇 进一步操作阵列,让音乐嗨起来

打了2000字消失了怎麽办呢(´・_・`) 先去上个厕所压压惊,恳请IT邦邦忙快优化界面 在编辑介面...

Thunkable学习笔记 4 - 变数(Firebase EMail登入的延伸)

这篇是 Thunkable学习笔记 2 - 加入Firebase登入功能(使用EMail) 的功能加...

[Day 29] 试验看看Regression方法

由於未知原因昨天的模型一直无法训练 为了Debug,今天把它改成价格预测来和这篇对照 把Loss地方...

D4. 学习基础C、C++语言

D4: 基本四则运算 基本的加减乘除,但是会发现除法结果只会显示整数的部分 所以我的解决办法是改成f...

【Day 16】InnoDB indexing

这是我很喜欢的主题,但今天依旧不舒服而且我队友不安ㄌ只好先断尾求生 InnoDB InnoDB 是...