Kubeflow 1.0 : パイプライン : パイプラインの理解 : Kubeflow パイプラインの概要 (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 04/13/2020 (1.0)
* 本ページは、Kubeflow の以下のページを翻訳した上で適宜、補足説明したものです:
- Pipelines : Understanding Pipelines : Overview of Kubeflow Pipelines
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。

パイプライン : パイプラインの理解 : Kubeflow パイプラインの概要
Kubeflow は Docker コンテナをベースに可搬な、スケーラブルな機械学習 (ML) ワークフローを構築して配備するためのプラットフォームです。
Kubeflow パイプラインとは何でしょう?
Kubeflow パイプライン・プラットフォームは以下から成ります :
- 実験、ジョブと実行を管理して追跡するためのユーザインターフェイス (UI)。
- マルチステップ ML ワークフローをスケジュールするためのエンジン。
- パイプラインとコンポーネントを定義して操作するための SDK。
- SDK を使用してシステムと相互作用するためのノートブック。
以下は Kubeflow パイプラインの目標です :
- End-to-end orchestration: 機械学習パイプラインの編成を有効にして単純化します。
- Easy experimentation: 多数のアイデアとテクニックを試して様々なトライアル/実験を管理することを容易にします。
- Easy re-use: end-to-end ソリューションを素早く作成するために毎回再構築しなければならないことなく、コンポーネントとパイプラインを再利用することを可能にします。
Kubeflow パイプラインは Kubeflow の中心的なコンポーネントあるいはスタンドアロン・インストールとして利用可能です。
パイプラインとは何でしょう?
パイプラインはワークフローの総てのコンポーネントを含む ML ワークフローとそれらがグラフの形式でどのように組み合わされるかの記述です。(パイプラインのグラフのサンプルを示す下のスクリーンショットを見てください。) パイプラインはパイプラインを実行するために必要な入力 (パラメータ) と各コンポーネントの入力と出力の定義を含みます。
貴方のパイプラインを開発後、Kubeflow パイプライに UI 上それをアップロードして共有できます。
パイプラインコンポーネントはパイプラインの 1 ステップを遂行する、Docker イメージ としてパッケージ化された、ユーザコードの自己充足的な (= self-contained) セットです。例えば、コンポーネントはデータ前処理、データ変換、モデル訓練等々の責任を負うことができます。
パイプライン と コンポーネント への概念ガイドを見てください。
パイプラインのサンプル
下のスクリーンショットとコードは xgboost-training-cm.py を示します、これは CSV フォーマットの構造化データを使用して XGBoost モデルを作成します。Github 上でパイプラインについてのソースコードと他の情報を見ることができます。
パイプラインのランタイム実行グラフ
下のスクリーンショットはサンプル・パイプラインの Kubeflow パイプライン UI のランタイム実行グラフを示します :

パイプラインを表す Python コード
下は xgboost-training-cm.py パイプラインを定義する Python コードからの抜粋です。GitHub で full コードを見ることができます。
@dsl.pipeline(
name='XGBoost Trainer',
description='A trainer that does end-to-end distributed training for XGBoost models.'
)
def xgb_train_pipeline(
output='gs://your-gcs-bucket',
project='your-gcp-project',
cluster_name='xgb-%s' % dsl.RUN_ID_PLACEHOLDER,
region='us-central1',
train_data='gs://ml-pipeline-playground/sfpd/train.csv',
eval_data='gs://ml-pipeline-playground/sfpd/eval.csv',
schema='gs://ml-pipeline-playground/sfpd/schema.json',
target='resolution',
rounds=200,
workers=2,
true_label='ACTION',
):
output_template = str(output) + '/' + dsl.RUN_ID_PLACEHOLDER + '/data'
# Current GCP pyspark/spark op do not provide outputs as return values, instead,
# we need to use strings to pass the uri around.
analyze_output = output_template
transform_output_train = os.path.join(output_template, 'train', 'part-*')
transform_output_eval = os.path.join(output_template, 'eval', 'part-*')
train_output = os.path.join(output_template, 'train_output')
predict_output = os.path.join(output_template, 'predict_output')
with dsl.ExitHandler(exit_op=dataproc_delete_cluster_op(
project_id=project,
region=region,
name=cluster_name
)):
_create_cluster_op = dataproc_create_cluster_op(
project_id=project,
region=region,
name=cluster_name,
initialization_actions=[
os.path.join(_PYSRC_PREFIX,
'initialization_actions.sh'),
],
image_version='1.2'
)
_analyze_op = dataproc_analyze_op(
project=project,
region=region,
cluster_name=cluster_name,
schema=schema,
train_data=train_data,
output=output_template
).after(_create_cluster_op).set_display_name('Analyzer')
_transform_op = dataproc_transform_op(
project=project,
region=region,
cluster_name=cluster_name,
train_data=train_data,
eval_data=eval_data,
target=target,
analysis=analyze_output,
output=output_template
).after(_analyze_op).set_display_name('Transformer')
_train_op = dataproc_train_op(
project=project,
region=region,
cluster_name=cluster_name,
train_data=transform_output_train,
eval_data=transform_output_eval,
target=target,
analysis=analyze_output,
workers=workers,
rounds=rounds,
output=train_output
).after(_transform_op).set_display_name('Trainer')
_predict_op = dataproc_predict_op(
project=project,
region=region,
cluster_name=cluster_name,
data=transform_output_eval,
model=train_output,
target=target,
analysis=analyze_output,
output=predict_output
).after(_train_op).set_display_name('Predictor')
_cm_op = confusion_matrix_op(
predictions=os.path.join(predict_output, 'part-*.csv'),
output_dir=output_template
).after(_predict_op)
_roc_op = roc_op(
predictions_dir=os.path.join(predict_output, 'part-*.csv'),
true_class=true_label,
true_score_column=true_label,
output_dir=output_template
).after(_predict_op)
dsl.get_pipeline_conf().add_op_transformer(
gcp.use_gcp_secret('user-gcp-sa'))
Kubeflow パイプライン UI 上のパイプライン入力データ
下の部分的なスクリーンショットはパイプラインの実行を始めるための Kubeflow パイプライン UI を示します。貴方のコードのパイプライン定義はどのパラメータが UI フォームに現れるかを決定します。パイプライン定義はまたパラメータのためのデフォルト値を設定できます :

パイプラインからの出力
以下のスクリーンショットは Kubeflow パイプライン UI 上可視なパイプライン出力のサンプルを示します。
予測結果 :

混同行列 :

Receiver operating characteristics (ROC) カーブ:

アーキテクチャ概要

高位では、パイプラインの実行は以下のように進みます :
- Python SDK: Kubeflow パイプライン・ドメイン固有言語 (DSL) を使用してコンポーネントを作成したりパイプラインを指定します。
- DSL コンパイラ: DSL コンパイラ はパイプラインの Python コードを静的 configuration (YAML) に変換します。
- パイプライン・サービス: 静的 configuration からパイプライン実行を作成するためにパイプライン・サービスを呼び出します。
- Kubernetes リソース: パイプラインを実行するために必要な Kubernetes リソース ( CRD ) を作成するために Kubernetes API サーバを呼び出します。
- Orchestration コントローラ: orchestration コントローラのセットはパイプラインを完了するために必要なコンテナを実行します。コンテナは仮想マシン上の Kubernetes ポッド内で実行されます。サンプル・コントローラは Argo ワークフロー コントローラで、これはタスク駆動のワークフローを編成します。
- Artifact ストレージ: ポッドは 2 つの種類のデータをストアします :
- Metadata: 実験、ジョブ、パイプライン実行、そして単一スカラー・メトリクス。メトリックデータはソートとフィルタリングの目的で集積されます。Kubeflow パイプラインは MySQL データベースにメタデータをストアします。
- Artifacts: パイプライン・パッケージ、ビュー、そして巨大スケール・メトリクス (時系列)。パイプライン実行をデバッグしたり個々の実行のパフォーマンスを調べたりするために巨大スケール・メトリクスを使用します。Kubeflow パイプラインは artifact を Minio サーバ や Cloud ストレージ のような artifact ストアにストアします。
MySQL データベースと Minio サーバは両者とも Kubernetes PersistentVolume サブシステムにバックアップされます。
- Persistence エージェントと ML メタデータ: パイプライン Persistence Agent はパイプライン・サービスにより作成された Kubernetes リソースを監視してそしてこれらのリソースの状態を ML メタデータ・サービスに永続化します。パイプライン Persistence エージェントは実行されたコンテナのセットとそれらの入力と出力を記録します。入力/出力はコンテナ・パラメータかデータ artifacts URI から成ります。
- Pipeline web サーバ: パイプライン web サーバは関連ビューを表示するために様々なサービスからデータを集めます : 現在実行中のパイプラインのリスト、パイプライン実行の履歴、データ artifacts のリスト、個々のパイプライン実行についてのデバッグ情報、個々のパイプラインについての実行ステータス。
以上