ホーム » Kubeflow » Kubeflow 1.0 : パイプライン : パイプラインの理解 : Kubeflow パイプラインの概要

Kubeflow 1.0 : パイプライン : パイプラインの理解 : Kubeflow パイプラインの概要

Kubeflow 1.0 : パイプライン : パイプラインの理解 : Kubeflow パイプラインの概要 (翻訳/解説)

翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 04/13/2020 (1.0)

* 本ページは、Kubeflow の以下のページを翻訳した上で適宜、補足説明したものです:

* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。

 

 

パイプライン : パイプラインの理解 : Kubeflow パイプラインの概要

Kubeflow は Docker コンテナをベースに可搬な、スケーラブルな機械学習 (ML) ワークフローを構築して配備するためのプラットフォームです。

 

Kubeflow パイプラインとは何でしょう?

Kubeflow パイプライン・プラットフォームは以下から成ります :

  • 実験、ジョブと実行を管理して追跡するためのユーザインターフェイス (UI)。
  • マルチステップ ML ワークフローをスケジュールするためのエンジン。
  • パイプラインとコンポーネントを定義して操作するための SDK。
  • SDK を使用してシステムと相互作用するためのノートブック。

以下は Kubeflow パイプラインの目標です :

  • End-to-end orchestration: 機械学習パイプラインの編成を有効にして単純化します。
  • Easy experimentation: 多数のアイデアとテクニックを試して様々なトライアル/実験を管理することを容易にします。
  • Easy re-use: end-to-end ソリューションを素早く作成するために毎回再構築しなければならないことなく、コンポーネントとパイプラインを再利用することを可能にします。

Kubeflow パイプラインは Kubeflow の中心的なコンポーネントあるいはスタンドアロン・インストールとして利用可能です。

 

パイプラインとは何でしょう?

パイプラインはワークフローの総てのコンポーネントを含む ML ワークフローとそれらがグラフの形式でどのように組み合わされるかの記述です。(パイプラインのグラフのサンプルを示す下のスクリーンショットを見てください。) パイプラインはパイプラインを実行するために必要な入力 (パラメータ) と各コンポーネントの入力と出力の定義を含みます。

貴方のパイプラインを開発後、Kubeflow パイプライに UI 上それをアップロードして共有できます。

パイプラインコンポーネントはパイプラインの 1 ステップを遂行する、Docker イメージ としてパッケージ化された、ユーザコードの自己充足的な (= self-contained) セットです。例えば、コンポーネントはデータ前処理、データ変換、モデル訓練等々の責任を負うことができます。

パイプラインコンポーネント への概念ガイドを見てください。

 

パイプラインのサンプル

下のスクリーンショットとコードは xgboost-training-cm.py を示します、これは CSV フォーマットの構造化データを使用して XGBoost モデルを作成します。Github 上でパイプラインについてのソースコードと他の情報を見ることができます。

 

パイプラインのランタイム実行グラフ

下のスクリーンショットはサンプル・パイプラインの Kubeflow パイプライン UI のランタイム実行グラフを示します :

 

パイプラインを表す Python コード

下は xgboost-training-cm.py パイプラインを定義する Python コードからの抜粋です。GitHub で full コードを見ることができます。

@dsl.pipeline(
    name='XGBoost Trainer',
    description='A trainer that does end-to-end distributed training for XGBoost models.'
)
def xgb_train_pipeline(
    output='gs://your-gcs-bucket',
    project='your-gcp-project',
    cluster_name='xgb-%s' % dsl.RUN_ID_PLACEHOLDER,
    region='us-central1',
    train_data='gs://ml-pipeline-playground/sfpd/train.csv',
    eval_data='gs://ml-pipeline-playground/sfpd/eval.csv',
    schema='gs://ml-pipeline-playground/sfpd/schema.json',
    target='resolution',
    rounds=200,
    workers=2,
    true_label='ACTION',
):
    output_template = str(output) + '/' + dsl.RUN_ID_PLACEHOLDER + '/data'

    # Current GCP pyspark/spark op do not provide outputs as return values, instead,
    # we need to use strings to pass the uri around.
    analyze_output = output_template
    transform_output_train = os.path.join(output_template, 'train', 'part-*')
    transform_output_eval = os.path.join(output_template, 'eval', 'part-*')
    train_output = os.path.join(output_template, 'train_output')
    predict_output = os.path.join(output_template, 'predict_output')

    with dsl.ExitHandler(exit_op=dataproc_delete_cluster_op(
        project_id=project,
        region=region,
        name=cluster_name
    )):
        _create_cluster_op = dataproc_create_cluster_op(
            project_id=project,
            region=region,
            name=cluster_name,
            initialization_actions=[
              os.path.join(_PYSRC_PREFIX,
                           'initialization_actions.sh'),
            ],
            image_version='1.2'
        )

        _analyze_op = dataproc_analyze_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            schema=schema,
            train_data=train_data,
            output=output_template
        ).after(_create_cluster_op).set_display_name('Analyzer')

        _transform_op = dataproc_transform_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            train_data=train_data,
            eval_data=eval_data,
            target=target,
            analysis=analyze_output,
            output=output_template
        ).after(_analyze_op).set_display_name('Transformer')

        _train_op = dataproc_train_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            train_data=transform_output_train,
            eval_data=transform_output_eval,
            target=target,
            analysis=analyze_output,
            workers=workers,
            rounds=rounds,
            output=train_output
        ).after(_transform_op).set_display_name('Trainer')

        _predict_op = dataproc_predict_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            data=transform_output_eval,
            model=train_output,
            target=target,
            analysis=analyze_output,
            output=predict_output
        ).after(_train_op).set_display_name('Predictor')

        _cm_op = confusion_matrix_op(
            predictions=os.path.join(predict_output, 'part-*.csv'),
            output_dir=output_template
        ).after(_predict_op)

        _roc_op = roc_op(
            predictions_dir=os.path.join(predict_output, 'part-*.csv'),
            true_class=true_label,
            true_score_column=true_label,
            output_dir=output_template
        ).after(_predict_op)

    dsl.get_pipeline_conf().add_op_transformer(
        gcp.use_gcp_secret('user-gcp-sa'))

 

Kubeflow パイプライン UI 上のパイプライン入力データ

下の部分的なスクリーンショットはパイプラインの実行を始めるための Kubeflow パイプライン UI を示します。貴方のコードのパイプライン定義はどのパラメータが UI フォームに現れるかを決定します。パイプライン定義はまたパラメータのためのデフォルト値を設定できます :

 

パイプラインからの出力

以下のスクリーンショットは Kubeflow パイプライン UI 上可視なパイプライン出力のサンプルを示します。

予測結果 :

混同行列 :

Receiver operating characteristics (ROC) カーブ:

 

アーキテクチャ概要

高位では、パイプラインの実行は以下のように進みます :

  • Python SDK: Kubeflow パイプライン・ドメイン固有言語 (DSL) を使用してコンポーネントを作成したりパイプラインを指定します。
  • DSL コンパイラ: DSL コンパイラ はパイプラインの Python コードを静的 configuration (YAML) に変換します。
  • パイプライン・サービス: 静的 configuration からパイプライン実行を作成するためにパイプライン・サービスを呼び出します。
  • Kubernetes リソース: パイプラインを実行するために必要な Kubernetes リソース ( CRD ) を作成するために Kubernetes API サーバを呼び出します。
  • Orchestration コントローラ: orchestration コントローラのセットはパイプラインを完了するために必要なコンテナを実行します。コンテナは仮想マシン上の Kubernetes ポッド内で実行されます。サンプル・コントローラは Argo ワークフロー コントローラで、これはタスク駆動のワークフローを編成します。
  • Artifact ストレージ: ポッドは 2 つの種類のデータをストアします :
    • Metadata: 実験、ジョブ、パイプライン実行、そして単一スカラー・メトリクス。メトリックデータはソートとフィルタリングの目的で集積されます。Kubeflow パイプラインは MySQL データベースにメタデータをストアします。
    • Artifacts: パイプライン・パッケージ、ビュー、そして巨大スケール・メトリクス (時系列)。パイプライン実行をデバッグしたり個々の実行のパフォーマンスを調べたりするために巨大スケール・メトリクスを使用します。Kubeflow パイプラインは artifact を Minio サーバCloud ストレージ のような artifact ストアにストアします。

    MySQL データベースと Minio サーバは両者とも Kubernetes PersistentVolume サブシステムにバックアップされます。

  • Persistence エージェントと ML メタデータ: パイプライン Persistence Agent はパイプライン・サービスにより作成された Kubernetes リソースを監視してそしてこれらのリソースの状態を ML メタデータ・サービスに永続化します。パイプライン Persistence エージェントは実行されたコンテナのセットとそれらの入力と出力を記録します。入力/出力はコンテナ・パラメータかデータ artifacts URI から成ります。
  • Pipeline web サーバ: パイプライン web サーバは関連ビューを表示するために様々なサービスからデータを集めます : 現在実行中のパイプラインのリスト、パイプライン実行の履歴、データ artifacts のリスト、個々のパイプライン実行についてのデバッグ情報、個々のパイプラインについての実行ステータス。
 

以上






AI導入支援 #2 ウェビナー

スモールスタートを可能としたAI導入支援   Vol.2
[無料 WEB セミナー] [詳細]
「画像認識 AI PoC スターターパック」の紹介
既に AI 技術を実ビジネスで活用し、成果を上げている日本企業も多く存在しており、競争優位なビジネスを展開しております。
しかしながら AI を導入したくとも PoC (概念実証) だけでも高額な費用がかかり取組めていない企業も少なくないようです。A I導入時には欠かせない PoC を手軽にしかも短期間で認知度を確認可能とするサービの紹介と共に、AI 技術の特性と具体的な導入プロセスに加え運用時のポイントについても解説いたします。
日時:2021年10月13日(水)
会場:WEBセミナー
共催:クラスキャット、日本FLOW(株)
後援:働き方改革推進コンソーシアム
参加費: 無料 (事前登録制)
人工知能開発支援
◆ クラスキャットは 人工知能研究開発支援 サービスを提供しています :
  • テクニカルコンサルティングサービス
  • 実証実験 (プロトタイプ構築)
  • アプリケーションへの実装
  • 人工知能研修サービス
◆ お問合せ先 ◆
(株)クラスキャット
セールス・インフォメーション
E-Mail:sales-info@classcat.com