Pipeline
,流水线或管线,顾名思义,就是让程序码,按照使用者安排的顺序一一执行,Azure machine learning 也有提供这样的服务,所以我们可以把前面几篇文章所做的事情,全部交给流水线执行。第一件事情,要先解决资料更新的问题,取得最新资料,并且将其存进 datastore,然後注册这份资料。在此,我们继续以汇率为例,来示范如何用pipeline
更新资料。
执行pipeline
一样至少需要两份以上的 py 档,将需要执行的任务,分别写成不同的 py 档,然後再整合在一起交由pipeline
执行。
get_currency.py
:之前在上传资料时介绍过,这次要以维护资料的角度改写。run_pipeline_data.py
:在本地端执行,便可把get_currency.py
上传到workspace
执行。Python
套件请在本机端安装
pip3.7 install azureml-pipeline
get_currency.py
import argparse
from datetime import datetime
import os
import pickle
import pandas as pd
import investpy
from sklearn.preprocessing import MinMaxScaler
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--history", type=bool, default=False)
parser.add_argument("--target_folder", type=str)
parser.add_argument("--input_folder", type=str)
args = parser.parse_args()
return args
# 以 history 这个变数作为区隔的依据,history 为 True,则是在本地端执行;反之,则是利用`pipeline`执行
def main():
args = parse_args()
if args.history:
if not os.path.isdir("currency"):
os.system("mkdir currency")
usd_twd = investpy.get_currency_cross_historical_data(
"USD/TWD",
from_date="01/01/1900",
to_date=datetime.now().strftime("%d/%m/%Y"),
)
usd_twd.reset_index(inplace=True)
usd_twd.to_csv("currency/usd_twd.csv", index=False)
currency_data = usd_twd.Close.values.reshape(-1, 1)
scaler = MinMaxScaler(feature_range=(0, 1))
scaler.fit(currency_data)
with open("currency/scaler.pickle", "wb") as f_h:
pickle.dump(scaler, f_h)
f_h.close()
currency_data = usd_twd[
(usd_twd.Date >= "2010-01-01") & (usd_twd.Date < "2021-01-01")
]
currency_data.to_csv("currency/training_data.csv")
# 以上都跟之前一模一样
else:
# 目标是从 input_path 取得旧的资料,与最新资料结合,将更新的结果存进 path。
path = os.path.join(args.target_folder, "usd_twd.csv")
input_path = os.path.join(args.input_folder, "usd_twd.csv")
history = pd.read_csv(input_path)
recent = investpy.get_currency_cross_recent_data("USD/TWD")
recent.reset_index(inplace=True)
history = history.append(recent, ignore_index=True)
history.drop_duplicates(subset="Date", keep="last", inplace=True)
history.to_csv(path, index=False)
# 将最近 2400 天的资料作为训练用的资料
history = history.tail(2400)
history.to_csv(
os.path.join(args.target_folder, "training_data.csv"), index=False
)
# 接着就必须要注册资料,资料才会真的更新。
run = Run.get_context()
work_space = run.experiment.workspace
datastore = work_space.get_default_datastore()
dataset = Dataset.File.from_files(path=(datastore, 'currency'))
dataset.register(work_space, name='currency')
if __name__ == "__main__":
main()
要注意的是,虽然在这边要做的事情是,读取旧资料,合并新资料後,更新资料。但是,输入的资料夹路径和输出的资料夹路径不能为同一个路径,否则就会得到错误讯息:Graph shouldn't have cycles。可以利用OutputFileDatasetConfig
这个class
解决这个问题,让资料先暂存,之後再推向 datastore。
run_pipeline_data.py
import os
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
from azureml.core.runconfig import RunConfiguration
from azureml.core import Workspace, Experiment, Dataset
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.pipeline.core import Pipeline
def main():
# 起手式,一定要先取得 workspace 权限
interactive_auth = InteractiveLoginAuthentication(tenant_id=os.getenv("TENANT_ID"))
work_space = Workspace.from_config(auth=interactive_auth)
datastore = work_space.get_default_datastore()
# 设定 input folder
input_folder = (
Dataset.File.from_files(path=(datastore, "currency"))
.as_named_input("input_folder")
.as_mount()
)
# 设定 output 路径
dataset = (
OutputFileDatasetConfig(name="usd_twd", destination=(datastore, "currency"))
.as_upload(overwrite=True)
.register_on_complete(name="currency")
)
# 选择之前注册的环境
aml_run_config = RunConfiguration()
environment = work_space.environments["train_lstm"]
aml_run_config.environment = environment
# 设定管线中的步骤,把会用到的 py 档、输入和输出的资料夹带入
get_currency = PythonScriptStep(
name="get_currency",
script_name="get_currency.py",
compute_target="cpu-cluster",
runconfig=aml_run_config,
arguments=[
"--target_folder",
dataset,
"--input",
input_folder,
],
allow_reuse=True,
)
# pipeline 的本质还是实验,所以需要建立实验,再把 pipeline带入
experiment = Experiment(work_space, "get_currency")
pipeline = Pipeline(workspace=work_space, steps=[get_currency])
run = experiment.submit(pipeline)
run.wait_for_completion(show_output=True)
# 执行终了,发布 pipeline,以便可以重复使用
run.publish_pipeline(
name="get_currency_pipeline",
description="Get currency with pipeline",
version="1.0",
)
if __name__ == "__main__":
main()
然後,执行python3.7 run_pipeline_data.py
。执行的结果可以,直接到workspace
的实验页面查询。点选刚刚执行的管线,可以看到执行的过程被画成流程图。
点进步骤
,再点选执行完的步骤,则会看到该实验的各种细节,也方便後续除错。
这篇算是让我搞懂pipeline
,下一篇要再加上模型训练和服务部署,完成一条龙服务。
<<: [Day 11] -『 GO语言学习笔记』- switch 叙述
>>: 要上传档案,你需要知道的事-stream binary
在先前[Day 09] tinyML开胃菜Arduino IDE上桌(下)已经简单介绍过Arduin...
前言 大家好,昨天我们稍微讲解了一下在笔者团队的研究中所需的前置步骤,那麽今天我们就来进入程序码的部...
嘿~~ 各位好,我是菜市场阿龙! 这集要介绍的是「物件导向程序设计」 频道:https://www....
上一篇我们统整了 NodeTransforms 里各个 methods 的用法以及参数介绍, 传送...
昨天Go弄出了Hello World,今天就来解释怎麽写的。 Golang入门一点 先贴上昨天写的程...