Day 26 Azure machine learning: Pipeline for model and service- 把工作通通串起来

Azure machine learning: Pipeline for model and service- 把工作通通串起来

接续上一篇,上一篇搞懂了pipeline在做什麽之後,这篇就显得格外轻松,把之前模型训练和服务部署的流程,考虑後续维护的情况,全部丢给pipeline执行就对了。依照前面的描述,这边会为了模型训练和服务部署,各准备一个 py 档,最後再以另一个 py 档执行pipeline

示范程序

train_lstm.py之前介绍过,这边为了使用pipeline执行,也考虑後续重复使用的情况,这边再稍微改造一下。下面针对新增的内容解释:

train_lstm.py

import argparse
import os
import pickle
import numpy as np
from azureml.core.run import Run
from azureml.core.model import Model
import pandas as pd
from keras.models import Sequential, load_model
from keras.layers import Dense, LSTM, Dropout
from keras.preprocessing.sequence import TimeseriesGenerator
from keras.callbacks import TensorBoard, EarlyStopping


def data_generator(data, data_len=240):
    """
    generate data for training and validation
    """
    generator = TimeseriesGenerator(
        data=data, targets=range(data.shape[0]), length=data_len, batch_size=1, stride=1
    )
    x_all = []
    for i in generator:
        x_all.append(i[0][0])
    x_all = np.array(x_all)
    y_all = data[range(data_len, len(x_all) + data_len)]
    rate = 0.4
    x_train = x_all[: int(len(x_all) * (1 - rate))]
    y_train = y_all[: int(y_all.shape[0] * (1 - rate))]
    x_val = x_all[int(len(x_all) * (1 - rate)) :]
    y_val = y_all[int(y_all.shape[0] * (1 - rate)) :]
    return x_train, y_train, x_val, y_val


def parse_args():
    """
    Parse arguments
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("--target_folder", type=str, help="Path to the training data")
    parser.add_argument(
        "--experiment",
        type=bool,
        default=False,
        help="Just run an experiment, there is no pipeline",
    )
    parser.add_argument(
        "--log_folder", type=str, help="Path to the log", default="./logs"
    )
    args = parser.parse_args()
    return args

# 考虑後续训练时,先读取效果最好的模型,以此为基础继续训练
def load_best_model(work_space, model_name, x_val, y_val):
    """
    load the best model from registered models
    """
    model_obj = Model(work_space, model_name)
    # 取得模型清单,撷取最近五个版本。除了版本 version 以外,属性 properties 也是可以作为选择模型的依据
    model_list = model_obj.list(work_space, name=model_name)
    version = [i.version for i in model_list]
    version.sort(reverse=True)
    # 选择最近训练的五个模型,并且以最近一段时间的资料评估模型的效果
    version = version[:5]
    val_loss = []
    for i in version:
        print(i)
        model_obj = Model(work_space, model_name, version=i)
        model_path = model_obj.download(exist_ok=True)
        model = load_model(model_path)
        val_loss.append(model.evaluate(x_val, y_val))
    # 选择 loss 最小的模型
    model_obj = Model(
        work_space, model_name, version=version[val_loss.index(min(val_loss))]
    )
    model_path = model_obj.download(exist_ok=True)
    model = load_model(model_path)
    return model, min(val_loss), version[val_loss.index(min(val_loss))]



def main():
    """
    Training of LeNet with keras
    """
    args = parse_args()
    # 在`workspace`执行时,取得当下资讯
    run = Run.get_context()
    usd_twd = pd.read_csv(os.path.join(args.target_folder, "training_data.csv"))
    data = usd_twd.Close.values.reshape(-1, 1)
    with open(os.path.join(args.target_folder, "scaler.pickle"), "rb") as f_h:
        scaler = pickle.load(f_h)
    f_h.close()
    data = scaler.transform(data)
    data_len = 240
    x_train, y_train, x_val, y_val = data_generator(data, data_len)
    # 单纯执行实验时,需要先定义模型架构,并且使用tensorboard
    loss_threshold = 1
    version = 0
    if args.experiment:
        model = Sequential()
        model.add(LSTM(16, input_shape=(data_len, 1)))
        model.add(Dropout(0.1))
        model.add(Dense(1))
        model.compile(loss="mse", optimizer="adam")
        # Tensorboard
        callback = TensorBoard(
            log_dir=args.log_folder,
            histogram_freq=0,
            write_graph=True,
            write_images=True,
            embeddings_freq=0,
            embeddings_layer_names=None,
            embeddings_metadata=None,
        )
    # 执行 pipeline 时,先读取效果最好的模型
    else:
        # 取得`workspace`权限
        work_space = run.experiment.workspace
        model, loss_threshold, version = \
        load_best_model(work_space,
                        model_name="currency",
                        x_val=x_val,
                        y_val=y_val)
        origin_model = model
        print("Load Model")
        # 如果 val_loss 进步太慢,就结束训练
        callback = EarlyStopping(monitor="val_loss",
                                 mode="min",
                                 min_delta=1e-8,
                                 patience=50)
        
    # train the network
    history_callback = model.fit(
        x_train,
        y_train,
        epochs=1000,
        batch_size=240,
        verbose=1,
        validation_data=[x_val, y_val],
        callbacks=[callback],
    )
    print("Finished Training")
    # 以 validation data 确认模型的效果,保留效果好的模型
    metrics = history_callback.history
    # 若刚训练好的模型比之前模型效果好,将训练的细节记录下来
    if metrics["val_loss"][-1] <= loss_threshold:
        run.log_list("train_loss", metrics["loss"][:10])
        run.log_list("val_loss", metrics["val_loss"][:10])
        run.log_list("start", [usd_twd.Date.values[0]])
        run.log_list("end", [usd_twd.Date.values[-1]])
        run.log_list("epoch", [len(history_callback.epoch)])
        run.log_list("last_version", [version])
        model.save("outputs/keras_lstm.h5")
        properties = {
            "train_loss": metrics["loss"][-1],
            "val_loss": metrics["val_loss"][-1],
            "data": "USD/TWD from {0} to {1}".format(
                usd_twd.Date.values[0], usd_twd.Date.values[-1]
            ),
            "epoch": len(history_callback.epoch),
            "last_version": version,
        }
    # 反之,则记录 val_loss,以及说明此模型是继承哪一个版本的模型
    else:
        run.log_list("val_loss", [loss_threshold])
        run.log_list("last_version", [version])
        origin_model.save("outputs/keras_lstm.h5")
        properties = {"val_loss": loss_threshold, "last_version": version}
    if args.experiment:
        with open("outputs/scaler.pickle", "wb") as f_h:
            pickle.dump(scaler, f_h)
        f_h.close()
    else:
    # 为了让整个流程自动化,所以训练完,直接在`workspace`注册模型
        model = Model.register(
            workspace=work_space,
            model_name="currency",
            tags={"model": "LSTM"},
            model_path="outputs/keras_lstm.h5",
            model_framework="keras",
            model_framework_version="2.2.4",
            properties=properties,
        )
        print("Registered Model")


if __name__ == "__main__":
    main()

在训练之後,选择当下最佳的模型注册,这样在部署服务时,就可以直接使用最新版的模型部署服务。
deploy_currency_prediction的内容不变,只是在执行pipeline时,workspace的权限也是必须靠Run取得。

deploy_currency_prediction


import os
import numpy as np
from azureml.core import Model, Workspace
from azureml.core import Run
from azureml.core.model import InferenceConfig
from azureml.core.webservice import AciWebservice
from azureml.core.authentication import InteractiveLoginAuthentication


def main():
    """
    Deploy model to your service
    """
    run = Run.get_context()
    try:
        work_space = run.experiment.workspace
    except AttributeError:
        interactive_auth = InteractiveLoginAuthentication(
            tenant_id=os.getenv("TENANT_ID")
        )
        work_space = Workspace.from_config(auth=interactive_auth)
    environment = work_space.environments["train_lstm"]
    model = Model(work_space, "currency")
    service_name = "currency-service"
    inference_config = InferenceConfig(
        entry_script="predict_currency.py", environment=environment
    )
    aci_config = AciWebservice.deploy_configuration(cpu_cores=1, memory_gb=1)
    scaler = Model(work_space, name="scaler", version=1)
    service = Model.deploy(
        workspace=work_space,
        name=service_name,
        models=[model, scaler],
        inference_config=inference_config,
        deployment_config=aci_config,
        overwrite=True,
    )
    service.wait_for_deployment(show_output=True)
    print(service.get_logs())
    print(service.scoring_uri)


if __name__ == "__main__":
    main()


跟之前上一篇的run_pipeline_data.py比较起来,这边会再新增两个流程到pipeline之中,分别是为了模型训练和服务部署。

run_pipeline.py

import os
from azureml.data import OutputFileDatasetConfig
from azureml.pipeline.steps import PythonScriptStep
from azureml.core.runconfig import RunConfiguration
from azureml.core import Workspace, Experiment, Dataset
from azureml.core.authentication import InteractiveLoginAuthentication
from azureml.pipeline.core import Pipeline


def main():
    interactive_auth = InteractiveLoginAuthentication(tenant_id=os.getenv("TENANT_ID"))
    work_space = Workspace.from_config(auth=interactive_auth)
    datastore = work_space.get_default_datastore()
    input_folder = (
        Dataset.File.from_files(path=(datastore, "currency"))
        .as_named_input("input_folder")
        .as_mount()
    )
    dataset = OutputFileDatasetConfig(
        name="usd_twd", destination=(datastore, "currency")
    )
    aml_run_config = RunConfiguration()
    environment = work_space.environments["train_lstm"]
    aml_run_config.environment = environment
    # 更新资料的步骤
    get_currency = PythonScriptStep(
        source_directory=".",
        name="get_currency",
        script_name="get_currency.py",
        compute_target="cpu-cluster",
        runconfig=aml_run_config,
        arguments=[
            "--target_folder",
            dataset.as_upload(overwrite=True).register_on_complete(name="currency"),
            "--input",
            input_folder,
        ],
        allow_reuse=True,
    )
    # 训练模型的步骤
    training = PythonScriptStep(
        source_directory=".",
        name="train_lstm",
        script_name="train_lstm.py",
        compute_target="cpu-cluster",
        runconfig=aml_run_config,
        arguments=["--target_folder", dataset.as_input()],
        allow_reuse=True,
    )
    # 部署服务的步骤
    deploy = PythonScriptStep(
        source_directory=".",
        name="deploy_currency_prediction",
        script_name="deploy_currency_prediction.py",
        compute_target="cpu-cluster",
        runconfig=aml_run_config,
        allow_reuse=True,
    )
    experiment = Experiment(work_space, "pipeline_data_train_deploy")

    pipeline = Pipeline(workspace=work_space, steps=[get_currency, training, deploy])
    run = experiment.submit(pipeline)
    run.wait_for_completion(show_output=True)
    # Pipeline 必须被发布,才能在後续进行排程(schedule)
    run.publish_pipeline(
        name="pipeline_data_train_deploy",
        description="data-->train-->deploy",
        version="1.0",
    )


if __name__ == "__main__":
    main()


执行python3.7 run_pipeline.py完成後,一样可以从workspace观察结果和pipeline的流程图。


Azure machine learning 的介绍即将进入尾声,既然都有了pipeline可以从把集资料、训练模型、部署服务,全部都串成起来,依照顺序执行,最後一件事情就是将pipeline排程。下一篇将介绍如何在workspace排程。


<<:  10. 人人皆可为师

>>:  D20 Email认证信 SMTP - Gmail

声明(claim)

-身份和存取管理 典型的身份验证过程包括三个步骤: 主体向身份提供者 (IdP) 表明其身份。 I...

[Day-6] C++关於运算的小学习

上次总结了过去所学到的知识 做了许多有趣的小练习 那这次要来学习的是C++的「运算式、运算子、运算元...

Day–3 Excel之快速移动栏列之术

欢迎大家,来到了第三天依旧充满干劲,今天要来跟大家介绍Excel的文字格式,以及今天的重点调整栏列的...

Day20:终於要进去新手村了-Javascript-函式-建立函式

终於来到函式的基础,建立函式了,以下就是基本的语法架构: function 函式名称(这个位置用来呼...

React-使用useRef跨组件操作DOM

"我想要在React上实现同一页在menu上点击,就滑到对应的区块,该怎麽做呢?"...