Day 25 Azure machine learning: Pipeline for data- 建立工作流程来收集资料

Azure machine learning: Pipeline for data- 建立工作流程来收集资料

Pipeline,流水线或管线,顾名思义,就是让程序码,按照使用者安排的顺序一一执行,Azure machine learning 也有提供这样的服务,所以我们可以把前面几篇文章所做的事情,全部交给流水线执行。第一件事情,要先解决资料更新的问题,取得最新资料,并且将其存进 datastore,然後注册这份资料。在此,我们继续以汇率为例,来示范如何用pipeline更新资料。

执行pipeline一样至少需要两份以上的 py 档,将需要执行的任务,分别写成不同的 py 档,然後再整合在一起交由pipeline执行。

  • get_currency.py:之前在上传资料时介绍过,这次要以维护资料的角度改写。
  • run_pipeline_data.py:在本地端执行,便可把get_currency.py上传到workspace执行。

安装Python套件

请在本机端安装

pip3.7 install azureml-pipeline

示范程序

get_currency.py

import argparse
from datetime import datetime
import os
import pickle
import pandas as pd
import investpy
from sklearn.preprocessing import MinMaxScaler


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--history", type=bool, default=False)
    parser.add_argument("--target_folder", type=str)
    parser.add_argument("--input_folder", type=str)
    args = parser.parse_args()
    return args

# 以 history 这个变数作为区隔的依据,history 为 True,则是在本地端执行;反之,则是利用`pipeline`执行
def main():
    args = parse_args()
    if args.history:
        if not os.path.isdir("currency"):
            os.system("mkdir currency")

        usd_twd = investpy.get_currency_cross_historical_data(
            "USD/TWD",
            from_date="01/01/1900",
            to_date=datetime.now().strftime("%d/%m/%Y"),
        )
        usd_twd.reset_index(inplace=True)
        usd_twd.to_csv("currency/usd_twd.csv", index=False)
        currency_data = usd_twd.Close.values.reshape(-1, 1)
        scaler = MinMaxScaler(feature_range=(0, 1))
        scaler.fit(currency_data)
        with open("currency/scaler.pickle", "wb") as f_h:
            pickle.dump(scaler, f_h)
        f_h.close()
        currency_data = usd_twd[
            (usd_twd.Date >= "2010-01-01") & (usd_twd.Date < "2021-01-01")
        ]
        currency_data.to_csv("currency/training_data.csv")
    # 以上都跟之前一模一样
    
    else:
        # 目标是从 input_path 取得旧的资料,与最新资料结合,将更新的结果存进 path。
        path = os.path.join(args.target_folder, "usd_twd.csv")
        input_path = os.path.join(args.input_folder, "usd_twd.csv")
        history = pd.read_csv(input_path)

        recent = investpy.get_currency_cross_recent_data("USD/TWD")
        recent.reset_index(inplace=True)
        history = history.append(recent, ignore_index=True)
        history.drop_duplicates(subset="Date", keep="last", inplace=True)
        history.to_csv(path, index=False)
        # 将最近 2400 天的资料作为训练用的资料
        history = history.tail(2400)
        history.to_csv(
            os.path.join(args.target_folder, "training_data.csv"), index=False
        )
        
        # 接着就必须要注册资料,资料才会真的更新。
        run = Run.get_context()
        work_space = run.experiment.workspace
        datastore = work_space.get_default_datastore()
        dataset = Dataset.File.from_files(path=(datastore, 'currency'))
        dataset.register(work_space, name='currency')

if __name__ == "__main__":
    main()

要注意的是,虽然在这边要做的事情是,读取旧资料,合并新资料後,更新资料。但是,输入的资料夹路径和输出的资料夹路径不能为同一个路径,否则就会得到错误讯息:Graph shouldn't have cycles。可以利用OutputFileDatasetConfig这个class解决这个问题,让资料先暂存,之後再推向 datastore。

run_pipeline_data.py

import os
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
from azureml.core.runconfig import RunConfiguration
from azureml.core import Workspace, Experiment, Dataset
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.pipeline.core import Pipeline


def main():
    # 起手式,一定要先取得 workspace 权限
    interactive_auth = InteractiveLoginAuthentication(tenant_id=os.getenv("TENANT_ID"))
    work_space = Workspace.from_config(auth=interactive_auth)
    datastore = work_space.get_default_datastore()
    # 设定 input folder 
    input_folder = (
        Dataset.File.from_files(path=(datastore, "currency"))
        .as_named_input("input_folder")
        .as_mount()
    )
    # 设定 output 路径
    dataset = (
        OutputFileDatasetConfig(name="usd_twd", destination=(datastore, "currency"))
        .as_upload(overwrite=True)
        .register_on_complete(name="currency")
    )
    
    # 选择之前注册的环境
    aml_run_config = RunConfiguration()
    environment = work_space.environments["train_lstm"]
    aml_run_config.environment = environment
    
    # 设定管线中的步骤,把会用到的 py 档、输入和输出的资料夹带入
    get_currency = PythonScriptStep(
        name="get_currency",
        script_name="get_currency.py",
        compute_target="cpu-cluster",
        runconfig=aml_run_config,
        arguments=[
            "--target_folder",
            dataset,
            "--input",
            input_folder,
        ],
        allow_reuse=True,
    )
    # pipeline 的本质还是实验,所以需要建立实验,再把 pipeline带入
    experiment = Experiment(work_space, "get_currency")

    pipeline = Pipeline(workspace=work_space, steps=[get_currency])
    run = experiment.submit(pipeline)
    run.wait_for_completion(show_output=True)
    # 执行终了,发布 pipeline,以便可以重复使用 
    run.publish_pipeline(
        name="get_currency_pipeline",
        description="Get currency with pipeline",
        version="1.0",
    )


if __name__ == "__main__":
    main()

然後,执行python3.7 run_pipeline_data.py。执行的结果可以,直接到workspace的实验页面查询。点选刚刚执行的管线,可以看到执行的过程被画成流程图。

点进步骤,再点选执行完的步骤,则会看到该实验的各种细节,也方便後续除错。


这篇算是让我搞懂pipeline,下一篇要再加上模型训练和服务部署,完成一条龙服务。


<<:  [Day 11] -『 GO语言学习笔记』- switch 叙述

>>:  要上传档案,你需要知道的事-stream binary

[Day 11] 让tinyML听见你的呼唤

在先前[Day 09] tinyML开胃菜Arduino IDE上桌(下)已经简单介绍过Arduin...

[DAY 27] 利用Python程序码让机器人走出隧道2

前言 大家好,昨天我们稍微讲解了一下在笔者团队的研究中所需的前置步骤,那麽今天我们就来进入程序码的部...

小学生学程序设计 Day 27:「夜市的鸡蛋糕」

嘿~~ 各位好,我是菜市场阿龙! 这集要介绍的是「物件导向程序设计」 频道:https://www....

Day 29. slate × Transforms × Selection & Text

上一篇我们统整了 NodeTransforms 里各个 methods 的用法以及参数介绍, 传送...

全端入门Day28_後端程序撰写之一点的Golang

昨天Go弄出了Hello World,今天就来解释怎麽写的。 Golang入门一点 先贴上昨天写的程...