ホーム » TensorFlow » TensorFlow : Deploy : 分散 TensorFlow

TensorFlow : Deploy : 分散 TensorFlow

TensorFlow : Deploy : 分散 TensorFlow(翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
更新日時 : 09/16/2017
作成日時 : 04/16/2016

* 本ページは、TensorFlow 本家サイトの Deploy : Distributed TensorFlow を翻訳した上で
適宜、補足説明したものです:

* (obsolete) 本ページは、TensorFlow 本家サイトの以下のページを翻訳した上で適宜、補足説明したものです:
    https://www.tensorflow.org/versions/r0.8/how_tos/distributed/index.html
* サンプルコードの動作確認はしておりますが、適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。

 
本文

この文書は TensorFlow サーバのクラスタをどのように作成し、そのクラスタに渡って計算グラフをどのように分散するかを示します。TensorFlow プログラムを書く 基本コンセプト に慣れていることを想定しています。

 

Hello 分散 (distributed) TensorFlow!

このチュートリアルではあなたが TensorFlow nightly build を使用していることを想定しています。ローカル・サーバを次のように開始して使用することであなたのインストールをテストすることができます :

# TensorFlow サーバを単一プロセス「クラスタ」として開始します。
$ python
>>> import tensorflow as tf
>>> c = tf.constant("Hello, 分散 TensorFlow!")
>>> server = tf.train.Server.create_local_server()
>>> sess = tf.Session(server.target)  # サーバ上でセッションを作成します。
>>> sess.run(c)
'Hello, 分散 TensorFlow!'

tf.train.Server.create_local_server() (訳注: リンク切れ) メソッドは、in-process サーバで、単一プロセス・クラスタを作成します。

 

クラスタを作成する

TensorFlow「クラスタ」は、TensorFlow グラフの分散実行に参加する「タスク」の集合です。各タスクは TensorFlow「サーバ」と関係し、これはセッションを作成するために使用可能な「マスター」、そしてグラフにおける演算を実行する「ワーカー」を含みます。クラスタは一つまたはそれ以上の「ジョブ」にも分割されます、そこでは各ジョブは一つまたはそれ以上のタスクを含みます。

クラスタを作成するためには、クラスタ内でタスク毎に TensorFlow サーバ一つを開始します。各タスクは典型的には異なるマシン上実行されますが、同じマシンで複数のタスクを実行することもできます(e.g. 異なる GPU デバイスを制御するため等)。各タスクでは、次を行ないます :

  • tf.train.ClusterSpec を作成します、これはクラスタの全てのタスクを記述します。これは各タスクで同じであるべきです。
  • tf.train.Server を作成します、コンストラクタには tf.train.ClusterSpec を渡して、ジョブ名とタスク・インデックスでローカル・タスクを識別します。

クラスタを記述するために tf.train.ClusterSpec を作成する

tf.train.ClusterSpec コンストラクション

利用可能なタスク
tf.train.ClusterSpec({“local”: [“localhost:2222”, “localhost:2223”]})
/job:local/task:0
/job:local/task:1
tf.train.ClusterSpec({
    "worker": [
        "worker0.example.com:2222", 
        "worker1.example.com:2222",
        "worker2.example.com:2222"
    ],
    "ps": [
        "ps0.example.com:2222",
        "ps1.example.com:2222"
    ]})
/job:worker/task:0
/job:worker/task:1
/job:worker/task:2
/job:ps/task:0
/job:ps/task:1

各タスクで tf.train.Server インスタンスを作成する

tf.train.Server オブジェクトはローカル・デバイスの集合、tf.train.ClusterSpec における他のタスクへの接続 (connections) の集合、そして分散計算を遂行するためにこれらを使用できる「セッション・ターゲット」を含みます。各サーバは特定の名前のジョブのメンバーで、そのジョブ内のタスクインデックスを持ちます。サーバはクラスタにおける任意の他のサーバと通信可能です

例えば、 localhost:2222 と localhost:2223 上で動作する2つのサーバによるクラスタを起動するためには、ローカルマシン上で次のスニペットを2つの異なるプロセスで実行します :

# In task 0:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=0)

# In task 1:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)

[Note] これらのクラスタ仕様を手動で指定するのは長く退屈なものです、特に大規模クラスタに対しては。私たちはプログラミング的にタスクを起動するためのツールについてワークしています、e.g. Kubernetes のようなクラスタ・マネージャを使用します。サポートを望む特定のクラスタ・マネージャがある場合には、GitHub issue をあげてください。

 

モデルで分散デバイスを指定する

特定のプロセスに演算を配置するためには、ops を CPU か GPU で実行するかを指定するのに使用されるのと同じ tf.device() 関数をが使用できます。例えば :

with tf.device("/job:ps/task:0"):
  weights_1 = tf.Variable(...)
  biases_1 = tf.Variable(...)

with tf.device("/job:ps/task:1"):
  weights_2 = tf.Variable(...)
  biases_2 = tf.Variable(...)

with tf.device("/job:worker/task:7"):
  input, labels = ...
  layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
  logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
  # ...
  train_op = ...

with tf.Session("grpc://worker7.example.com:2222") as sess:
  for _ in range(10000):
    sess.run(train_op)

上の例では、変数は ps ジョブの2つのタスク上に作成され、そしてモデルの数値計算部はワーカー・ジョブで作成されます。TensorFlow はジョブ間に適当なデータ転送を挿入します(forward pass のためには ps からワーカーに、そして勾配の適用のためにはワーカーから ps に)。

 

レプリケーションされた (Replicated) 訓練

「データ並列性 (data parallelism)」と呼ばれる、一般的な訓練構成 (configuration) は同じモデルをデータ上の異なるミニ・バッチで訓練する、ワーカー・ジョブの複数のタスクを含み、ps ジョブにおける一つまたは複数のタスクにホストされる共有パラメータを更新します。全てのタスクは典型的には異なるマシン上で実行されます。TensorFlow でこの構造を指定するには多くの方法あり、レプリケーション・モデル (replicated model) を指定するワークを単純にするライブラリを構築しています。可能なアプローチは次のようなものです :

  • グラフ内 (In-graph) レプリケーション。このアプローチでは、クライアントは、(/job:ps に固定された tf.Variable ノードの) パラメータのワンセットを含む 単一の tf.Graph を構築します; そしてモデルの数値計算部の複数のコピーは、それぞれ /job:worker の異なるタスクに固定されます。
  • グラフ間 (Between-graph) レプリケーション。このアプローチでは、各 /job:worker タスクに対して分離したクライアントがあり、典型的にはワーカータスクとしての同じプロセス内にあります。各クライアントはパラメータを含む同様のグラフを構築します。(パラメータは同じように /job:ps に固定されてそれらを同じタスクに決定論的にマップするために tf.train.replica_device_setter() (訳注: リンク切れ) を使用します); そしてモデルの数値計算部の一つのコピーは、/job:worker のローカルタスクに固定されます。
  • 非同期 (Asynchronous) 訓練。このアプローチでは、グラフの各レプリカは、連携なし (w/o coordination) に実行される独立的な訓練ループを持ちます。上のリプリケーション形式の両者と互換性があります。
  • 同期 (Synchronous) 訓練。このアプローチでは、全てのレプリカは 現在のパラメータについて同じ値を読み、並列に勾配を計算し、そしてそれらを一緒に適用します。これは in-graph リプリケーションと互換で(e.g. CIFAR-10 マルチ-GPU トレーナー 内のように勾配平均を使用)、そして between-graph レプリケーションと互換です(e.g. tf.train.SyncReplicasOptimizer を使用)。

まとめ: サンプル・トレーナー・プログラム

次のコードは分散トレーナー・プログラムのスケルトンを示し、between-graph リプリケーションと非同期訓練を実装しています。これはパラメータ・サーバとワーカー・タスクのためのコードを含みます。

import tensorflow as tf

# tf.train.ClusterSpec を定義するためのフラグ
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")

# tf.train.Server を定義するためのフラグ
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")

FLAGS = tf.app.flags.FLAGS


def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts(",")

  # パラメータ・サーバとワーカー・ホストからクラスタを作成します。
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
  
  # ローカル・タスクのためのサーバを作成して開始します。
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":

    # デフォルトで ops をローカル・ワーカーに割り当てます。
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

      # モデルを構築します...
      loss = ...
      global_step = tf.Variable(0)

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)

      saver = tf.train.Saver()
      summary_op = tf.merge_all_summaries()
      init_op = tf.initialize_all_variables()

    # 「スーパーバイザ」を作成します、これは訓練プロセスを監視します。
    sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                             logdir="/tmp/train_logs",
                             init_op=init_op,
                             summary_op=summary_op,
                             saver=saver,
                             global_step=global_step,
                             save_model_secs=600)

    # スーパーバイザはセッション初期化をケアしてチェックポイントから復旧します。
    sess = sv.prepare_or_wait_for_session(server.target)

    # 入力パイプラインに対してキュー・ランナーを開始します(もしあれば)。
    sv.start_queue_runners(sess)

    # スーパーバイザがシャットダウンするまでロープします(あるいは 1000000 ステップが完了するまで)。
    step = 0
    while not sv.should_stop() and step < 1000000:
      # 訓練ステップを非同期に実行します。
      # *同期* 訓練をどのように遂行するかについての追加の詳細情報は `tf.train.SyncReplicasOptimizer` を参照。
      _, step = sess.run([train_op, global_step])


if __name__ == "__main__":
  tf.app.run()

2つのパラメータ・サーバと2つのワーカーでトレーナーを開始するには、次のコマンドラインを使用します(スクリプトは trainer.py とします) :

# On ps0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=1
 

用語集

[クライアント]
クライアントは典型的には、TensorFlow グラフを構築してクラスタと相互作用するために `tensorflow::Session` をコンストラクトするプログラムです。クライアントは典型的には Python または C++ で書かれます。単一のクライアント・プロセスは複数の TensorFlow サーバと直接的に相互作用可能です(上の「リプリケーション訓練」を参照)、そして単一のサーバは複数のクライアントにサービスが提供できます。

[クラスタ]
TensorFlow クラスタは一つまたはそれ以上の「ジョブ」から成り、それぞれは一つまたはそれ以上の「タスク」のリストに分割されます。クラスタは典型的には、多くのマシンを並列に使用してニューラルネットワークを訓練するような、特定の高位レベルの目的に貢献 (dedicated) します。クラスタは `tf.train.ClusterSpec` オブジェクトで定義されます。

[ジョブ]
ジョブは、典型的には共通の目的のためにサービスを提供する、「タスク」のリストから成ります。例えば、`ps` (「パラメータ・サーバ (parameter server)」)と命名されるジョブは典型的には変数をストアして更新するホスト・ノードです; 一方、`worker` と命名されるジョブは典型的には数値計算タスクを遂行するホスト・ステートレス・ノードです。ジョブのタスクは典型的には異なるマシン上で実行されます。ジョブのセットの役割は柔軟です: 例えば、ある `worker` はある状態を維持するかもしれません。

[マスター・サービス]
RPC サービスで、分散デバイスのセットへのリモート・アクセスを提供し、セッション・ターゲットとして動作します。マスター・サービスは tensorflow::Session インターフェイスを実装し、そして一つまたはそれ以上の「ワーカー・サービス」に渡るワークをコーディネートする責任があります。全ての TensorFlow サーバはマスター・サービスを実装します。

[タスク]
タスクは特定の TensorFlow サーバに該当し、そして典型的には単一のプロセスに該当します。タスクは特定の「ジョブ」に属してジョブのタスクリストのインデックスで識別されます。

[TensorFlow サーバ]
tf.train.Server インスタンスを実行するプロセスで、クラスタのメンバーで、そして「マスター・サービス」と「ワーカー・サービス」をエクスポートします。

[ワーカー・サービス]
RPC サービスで、ローカルデバイスを使用して TensorFlow グラフの一部を実行します。ワーカー・サービスは worker_service.proto を実行します。全ての TensorFlow サーバはワーカー・サービスを実装します。

 

以上

AI導入支援 #2 ウェビナー

スモールスタートを可能としたAI導入支援   Vol.2
[無料 WEB セミナー] [詳細]
「画像認識 AI PoC スターターパック」の紹介
既に AI 技術を実ビジネスで活用し、成果を上げている日本企業も多く存在しており、競争優位なビジネスを展開しております。
しかしながら AI を導入したくとも PoC (概念実証) だけでも高額な費用がかかり取組めていない企業も少なくないようです。A I導入時には欠かせない PoC を手軽にしかも短期間で認知度を確認可能とするサービの紹介と共に、AI 技術の特性と具体的な導入プロセスに加え運用時のポイントについても解説いたします。
日時:2021年10月13日(水)
会場:WEBセミナー
共催:クラスキャット、日本FLOW(株)
後援:働き方改革推進コンソーシアム
参加費: 無料 (事前登録制)
人工知能開発支援
◆ クラスキャットは 人工知能研究開発支援 サービスを提供しています :
  • テクニカルコンサルティングサービス
  • 実証実験 (プロトタイプ構築)
  • アプリケーションへの実装
  • 人工知能研修サービス
◆ お問合せ先 ◆
(株)クラスキャット
セールス・インフォメーション
E-Mail:sales-info@classcat.com