【Day29-管线】ETL资料工程的必备观念与流程工具Dagster介绍

终於到了倒数第二天了,那前一天我们介绍了用爬虫作为资料获取的手段,那今天就来稍微讲解一下有了资料之後需要怎样进行资料流(data pipeline)的管理吧

ETL概念

什麽是ETL?

ETL是一个缩写,一共由以下几个部分所组成

  • Extract(抽取)
  • Transform(转换)
  • Load(载入)


图片来源:Microsoft

通常是用在资料的仓储与管理,在越大量复杂的资料就越适合使用这种概念进行管理资料。

ETL有什麽帮助?

一笔原始的数据,通常会需要经过许多不同步骤的处理才会送入到我们最终需要的地方。而有的时候比起实作上是怎麽达成的细节,我们通常会更加在意这些经过不同步骤的阶段的资料。因此将资料流在过程中以ETL的方式进行拆解和管理会可以更加有效的去维护和开发。

Dagster

Dagster是什麽?

  • 一款可以定义资料元之间的依赖关系和流程的工具
  • 可通过 API 查询、操作和监控。
  • 除了task执行的依赖性以外,另外针对data也建立依赖关系。
  • 可以在支援多种其他工具或语言搭配使用
    • Spark
    • Python
    • SQL
    • Jupyter 
    • locally
    • CI/CD pipelines
    • staging
    • production
  • 在python内用decorater建立流程
  • 拥有很漂亮的Web UI介面用来操控和监视


Web介面,图片来源:Dagster

安装

安装方式非常简单,只要用pip像是安装一般python套件一样即可

pip install dagster dagit
  • Dagster:核心元件,以及包含可以用命令行执行的工具
  • Dagit:主要是Web UI的部分

基本使用

建立Dagster流程

我们建立一份Day29-1.py的档案,内容如下

from dagster import execute_pipeline, pipeline, solid


@solid
def get_name():
    return 'dagster'


@solid
def hello(something):
    print(something)


@pipeline
def hello_pipeline():
    hello(get_name())

这边我们一共用了两种decrorater,其中

  • solid:用来建立资料流的基本节点
  • pipeline:在用来呼叫不同的Solid节点,Dagster会自动帮我们建立流程

开启Dagit web介面

在Terminal输入

dagit -f Day29-1.py

以单档方式打开这份流程,然後开启localhost:3000即可进入介面

Dagster帮忙建立好的流程图

在Web介面上执行

有许多不同的执行流程显示方式

进阶一点点,执行时调整参数

dagster可以把一些参数留到Web UI的介面调整,因此我们这边就来试看看,建立一份Day29-2.py

from dagster import pipeline, solid, InputDefinition, OutputDefinition, PresetDefinition, ModeDefinition


@solid(
    config_schema={"input": str}, # config输入型态的检查
    description="说明文字",

)
def string_from_config(context): 
    text = context.solid_config["input"] # 参数在.solid_config底下的dict
    return text


@solid(
    input_defs=[InputDefinition("input_string", dagster_type = str)], # 输入变数的名称和形态
) 
def show(context, input_string):
    context.log.info(input_string)
    return 1


@pipeline(
    mode_defs=[
        ModeDefinition(
            name="default1",
        ),
        ModeDefinition(
            name="default2",
        ),
    ],
    preset_defs=[
        # 预设config用
        PresetDefinition(
            "default1",
            run_config={
                "solids": {
                    "string_from_config": {"config": {"input": ":default string 1:"}}
                }
            },
            mode="default1",
        ),
        # 第二笔config
        PresetDefinition(
            "default2",
            run_config={
                "solids": {
                    "string_from_config": {"config": {"input": ":default string 2:"}}
                }
            },
            mode="default2",
        ),
    ],
)
def my_pipeline():
    show(string_from_config())

进入Playground会可以交出已经配置好的参数,或是自己另外更改

而结果也会随着前面的配置而改动

结语

当初看到的时候是觉得这东西真的挺酷的,虽然上手门槛不低而且需要特地去适应他的逻辑,但习惯之後真的对一些资料流程专案的设计管理上是蛮有帮助的~


<<:  【Day 29】- 应对反爬虫技术-综合篇

>>:  连续 30 天 玩玩看 ProtoPie - Day 29

Day_21 DNS/DDNS/Port Forwards (二)

昨日DNS/DDNS/Port Forwards (一)中提到固定实体IP的设定,但如果你用的是浮动...

Day 1 - 前言与介绍

Day1-前言与介绍 大家好,我是来自南港高中科学班的白秉轩,我目前三年级,是个准学测生,因为同学的...

Day11 - 敏捷式接案实践( 三 ) - 时间管理

自从改变自己的工作模式後,现在平均的案量是每天有计时的工作时数大概是三个小时,并且同时有三位长期配合...

成员 10 人:半夜加班,毛骨悚然的诡故事

人员终於跨入两位数,虽然只是个不明显的里程碑; 但也准备从「求生存」往「求发展」的路程前进。 这时候...

网路卡卡的介於通与不通之间 (事情做不了了啦) ~

面临这种网路不通问题如何解决呢 ? 网路不通你可以如此做 : 运用查线器确认一下 , 1 – 8 ...