TensorFlow 2.0 : 上級 Tutorials : 分散訓練 :- Keras でマルチワーカー訓練 (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 11/02/2019
* 本ページは、TensorFlow org サイトの TF 2.0 – Advanced Tutorials – Distributed training の以下のページを翻訳した上で
適宜、補足説明したものです:
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。
- お住まいの地域に関係なく Web ブラウザからご参加頂けます。事前登録 が必要ですのでご注意ください。
- Windows PC のブラウザからご参加が可能です。スマートデバイスもご利用可能です。
◆ お問合せ : 本件に関するお問い合わせ先は下記までお願いいたします。
株式会社クラスキャット セールス・マーケティング本部 セールス・インフォメーション |
E-Mail:sales-info@classcat.com ; WebSite: https://www.classcat.com/ |
Facebook: https://www.facebook.com/ClassCatJP/ |
分散訓練 :- Keras でマルチワーカー訓練
概要
このチュートリアルは tf.distribute.Strategy API を使用して Keras モデルによるマルチワーカー分散訓練を実演します。マルチワーカー訓練のために特に設計されたストラテジーの助けにより、シングルワーカー上で動作するために設計された Keras モデルは最小限のコード変更で複数ワーカーでシームレスに動作できます。
Distributed Training in TensorFlow ガイドは TensorFlow がサポートする分散ストラテジーの概要について tf.distribute.Strategy API のより深い理解に感心がある人達のために利用可能です。
セットアップ
最初に、TensorFlow と必要なインポートをセットアップします。
from __future__ import absolute_import, division, print_function, unicode_literals
import tensorflow_datasets as tfds import tensorflow as tf tfds.disable_progress_bar()
データセットを準備する
今、TensorFlow Dataset からMNIST データセットを準備しましょう。MNIST データセット は、28×28-ピクセルのモノクロ画像としてフォーマットされた、手書き数字 0-9 の 60,000 訓練サンプルと 10,000 テストサンプルから成ります。
BUFFER_SIZE = 10000 BATCH_SIZE = 64 def make_datasets_unbatched(): # Scaling MNIST data from (0, 255] to (0., 1.] def scale(image, label): image = tf.cast(image, tf.float32) image /= 255 return image, label datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True) return datasets['train'].map(scale).cache().shuffle(BUFFER_SIZE) train_datasets = make_datasets_unbatched().batch(BATCH_SIZE)
Downloading and preparing dataset mnist (11.06 MiB) to /home/kbuilder/tensorflow_datasets/mnist/1.0.0... /usr/lib/python3/dist-packages/urllib3/connectionpool.py:860: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings InsecureRequestWarning) /usr/lib/python3/dist-packages/urllib3/connectionpool.py:860: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings InsecureRequestWarning) /usr/lib/python3/dist-packages/urllib3/connectionpool.py:860: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings InsecureRequestWarning) /usr/lib/python3/dist-packages/urllib3/connectionpool.py:860: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings InsecureRequestWarning) WARNING:tensorflow:From /home/kbuilder/.local/lib/python3.6/site-packages/tensorflow_datasets/core/file_format_adapter.py:209: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version. Instructions for updating: Use eager execution and: `tf.data.TFRecordDataset(path)` WARNING:tensorflow:From /home/kbuilder/.local/lib/python3.6/site-packages/tensorflow_datasets/core/file_format_adapter.py:209: tf_record_iterator (from tensorflow.python.lib.io.tf_record) is deprecated and will be removed in a future version. Instructions for updating: Use eager execution and: `tf.data.TFRecordDataset(path)` Dataset mnist downloaded and prepared to /home/kbuilder/tensorflow_datasets/mnist/1.0.0. Subsequent calls will reuse this data.
Keras モデルを構築する
ここではMNIST データセットで訓練するために単純な畳み込みニューラルネットワーク Keras モデルを構築してコンパイルするために tf.keras.Sequential API を使用します。
def build_and_compile_cnn_model(): model = tf.keras.Sequential([ tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Flatten(), tf.keras.layers.Dense(64, activation='relu'), tf.keras.layers.Dense(10, activation='softmax') ]) model.compile( loss=tf.keras.losses.sparse_categorical_crossentropy, optimizer=tf.keras.optimizers.SGD(learning_rate=0.001), metrics=['accuracy']) return model
最初に小さい数のエポックの間モデルを訓練してみて総てが正しく動作するかを確実にするためにシングル・ワーカーで結果を観察しましょう。エポックが進むにつれて損失が落ちて精度が 1.0 に近づくことを見ることを期待できるはずです。
single_worker_model = build_and_compile_cnn_model() single_worker_model.fit(x=train_datasets, epochs=3)
Epoch 1/3 938/938 [==============================] - 11s 12ms/step - loss: 2.0667 - accuracy: 0.4719 Epoch 2/3 938/938 [==============================] - 2s 3ms/step - loss: 1.1673 - accuracy: 0.7724 Epoch 3/3 938/938 [==============================] - 2s 3ms/step - loss: 0.6323 - accuracy: 0.8469 <tensorflow.python.keras.callbacks.History at 0x7fc3b85f7978>
マルチワーカー Configuration
さて、マルチワーカー訓練の世界に入りましょう。TensorFlow では、複数マシン上の訓練のために TF_CONFIG 環境変数が必要です、それらの各々は多分異なる役割りを持ちます。TF_CONFIG はクラスタの一部である各ワーカー上クラスタ構成を指定するために使用されます。
TF_CONFIG の 2 つのコンポーネントがあります: cluster と task です。cluster は訓練クラスタについての情報を提供します、これは worker のようなジョブの異なる型から成る dict です。マルチワーカー訓練では、通常のワーカーが行なうものに加えて、チェックポイントをセーブして TensorBoard のために要約ファイルを書くような少し多くの責任を追う 1 つのワーカーが通常はあります。そのようなワーカーは ‘chief’ ワーカーとして参照され、インデックス 0 を持つワーカーが chief ワーカーとして任命されるのが習慣です (実際にこれは tf.distribute.Strategy がどのように実装されるかです)。他方、task は現在のタスクの情報を提供します。
このサンプルでは、タスクタイプを “worker” にそしてタスクインデックスを 0 に設定します。これはそのような設定を持つマシンが最初のワーカーであることを意味します、これはチーフ・ワーカーとして指定されて他のワーカーよりも多くの作業を行ないます。他のマシンもまた TF_CONFIG 環境変数を持つ必要があることに注意してください、そしてそれは同じ cluster dict を、しかしそれらのマシンの役割りが何であるかに依拠して異なるタスクタイプあるいはタスクインデックスを持つはずです。
説明目的で、このチュートリアルはローカルホスト上の 2 ワーカーで TF_CONFIG をどのように設定して良いかを示します。実際には、ユーザは外部 IP アドレス/ポート上でマルチワーカーを作成し、そして各ワーカー上で TF_CONFIG を適切に設定します。
os.environ['TF_CONFIG'] = json.dumps({ 'cluster': { 'worker': ["localhost:12345", "localhost:23456"] }, 'task': {'type': 'worker', 'index': 0} })
このサンプルでは学習率は固定される一方で、一般にはグローバルバッチサイズに基づいて学習率を調整する必要があるかもしれません。
正しいストラテジーを選択する
TensorFlow では、分散訓練は同期訓練、そこでは訓練ステップはワーカーとレプリカに渡り同期されます、そして非同期訓練、そこでは訓練ステップは厳密には同期されません、から成ります。
MultiWorkerMirroredStrategy、これは同期マルチワーカー訓練のために推奨されるストラテジーです、はこのガイドで実演されます。モデルを訓練するために、tf.distribute.experimental.MultiWorkerMirroredStrategy のインスタンスを使用します。MultiWorkerMirroredStrategy は総てのワーカーに渡り各デバイス上でモデルの層の総ての変数のコピーを作成します。それは勾配を累積して変数を同期して保持するために CollectiveOps、collective な通信のための TensorFlow op を使用します。tf.distribute.Strategy ガイド はこのストラテジーについてより詳細を持ちます。
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker CollectiveAllReduceStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO INFO:tensorflow:Single-worker CollectiveAllReduceStrategy with local_devices = ('/device:GPU:0',), communication = CollectiveCommunication.AUTO
Note: MultiWorkerMirroredStrategy.init() が呼び出されるときに TF_CONFIG はパースされて TensorFlow の GRPC は開始されますので、TF_CONFIG 環境変数は tf.distribute.Strategy インスタンスが作成される前に設定されなければなりません。
MultiWorkerMirroredStrategy は CollectiveCommunication パラメータを通して複数の実装を提供します。RING は gRPC を使用して ring-based collective を cross-ホスト通信層として実装します。NCCL は collective を実装するために Nvidia の NCCL を使用します。AUTO は選択をランタイムに任せます。collective 実装の最善の選択は GPU の数と種類、そしてクラスタのネットワーク相互接続に依拠します。
モデルを MultiWorkerMirroredStrategy で訓練する
tf.distribute.Strategy API の tf.keras への統合では、訓練をマルチワーカーに分散するために行なう唯一の変更はモデル構築と model.compile() 呼び出しを strategy.scope() 内に包み込むことです。分散ストラテジーのスコープは変数がどのようにそしてどこで作成されるかを指示します、そして MultiWorkerMirroredStrategy の場合には、作成される変数は MirroredVariable で、それらはワーカーの各々で複製されます。
Note: 現在 MultiWorkerMirroredStrategy には制限があり、そこでは TensorFlow ops は strategy のインスタンスが作成された後に作成される必要があります。RuntimeError: Collective ops must be configured at program startup を見る場合、MultiWorkerMirroredStrategy のインスタンスをプログラムの最初で作成してみてください、そして ops を作成するかもしれないコードを strategy がインスタンス化された後に置きます。
Note: この Colab では、次のコードは期待される結果とともに動作できますが、けれどもこれは効果的なシングルワーカー訓練です、何故ならば TF_CONFIG が設定されていないからです。貴方自身のサンプルで TF_CONFIG をひとたび設定すれば、複数のマシン上の訓練によるスピードアップを期待できるはずです。
NUM_WORKERS = 2 # Here the batch size scales up by number of workers since # `tf.data.Dataset.batch` expects the global batch size. Previously we used 64, # and now this becomes 128. GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS with strategy.scope(): # Creation of dataset, and model building/compiling need to be within # `strategy.scope()`. train_datasets = make_datasets_unbatched().batch(GLOBAL_BATCH_SIZE) multi_worker_model = build_and_compile_cnn_model() multi_worker_model.fit(x=train_datasets, epochs=3)
Epoch 1/3 469/469 [==============================] - 9s 18ms/step - loss: 2.2174 - accuracy: 0.2142 Epoch 2/3 469/469 [==============================] - 1s 3ms/step - loss: 1.9502 - accuracy: 0.5165 Epoch 3/3 469/469 [==============================] - 1s 3ms/step - loss: 1.4742 - accuracy: 0.7417 <tensorflow.python.keras.callbacks.History at 0x7fc3a0123e48>
Dataset シャーディングとバッチサイズ
マルチワーカー訓練では、収束とパフォーマンスを確実にするためにデータを複数のパートにシャーディングすることが必要です。けれども、上のコードスニペットでは、データセットはシャーディングする必要なく model.fit() に直接送られていることに注意してください ; これは tf.distribute.Strategy API がマルチワーカー訓練でデータセットのシャーディングを自動的に対処するからです。
貴方の訓練のために手動のシャーディングを好むのであれば、自動シャーディングは tf.data.experimental.DistributeOptions api を通して無効にできます。具体的には、
options = tf.data.Options() options.experimental_distribute.auto_shard = False train_datasets_no_auto_shard = train_datasets.with_options(options)
注意すべきもう一つのことはデータセットのためのバッチサイズです。上のコードスニペットでは、GLOBAL_BATCH_SIZE = 64 * NUM_WORKERS を使用しています、これはそれがシングルワーカーのためのケースほどの大きさの NUM_WORKERS 倍です、何故ならば効果的なワーカー毎バッチサイズはワーカー数で除算されたグローバルバッチサイズです、そしてこの変更により前と同じワーカー毎バッチサイズを保持しています。
パフォーマンス
今は MultiWorkerMirroredStrategy によりマルチワーカーで動作するように総てセットアップされた Keras モデルを持ちます。マルチワーカー訓練のパフォーマンスを微調整するために次のテクニックを試すことができます。
- MultiWorkerMirroredStrategy は複数の collective 通信実装 を提供します。RING は gRPC を使用して ring-based collectives を cross-ホスト通信層として実装します。NCCL は collective を実装するために Nvidia の NCCL を使用します。AUTO は選択をランタイムに任せます。collective 実装の最善の選択は GPU の数と種類、そしてクラスタのネットワーク相互接続に依拠します。自動選択をオーバーライドするには、MultiWorkerMirroredStrategy のコンストラクタの communication パラメータに正当な値を指定します、e.g. communication=tf.distribute.experimental.CollectiveCommunication.NCCL。
- 可能であれば変数を tf.float にキャストします。公式の ResNet モデルはこれがどのように成されるかの サンプル を含みます。
フォールトトレランス
同期訓練では、ワーカーの一つが失敗して障害復旧メカニズムが存在しないのであれば、クラスタは失敗するでしょう。tf.distribute.Strategy で Keras を使用するとワーカーが予期せぬ停止をするかそうでないなら安定していない場合にフォールトトレランスの優位点を装備します。これを訓練状態を貴方が選択する分散ファイルシステムに保存することにより行ないます、その結果前に失敗したか無効にされたインスタンスの再開始時に訓練状態がリカバーされます。
総てのワーカーは訓練エポックとステップの観点から同期的に保持されますので、他のワーカーは失敗したか無効にされたワーカーが続行するために再開始されるのを待つ必要があります。
ModelCheckpoint コールバック
マルチワーカー訓練でフォールトトレランスを利用するために、tf.keras.Model.fit() 呼び出しで tf.keras.callbacks.ModelCheckpoint のインスタンスを提供します。コールバックは ModelCheckpoint の filepath 引数に対応するディレクトリにチェックポイントと訓練状態をストアします。
# Replace the `filepath` argument with a path in the file system # accessible by all workers. callbacks = [tf.keras.callbacks.ModelCheckpoint(filepath='/tmp/keras-ckpt')] with strategy.scope(): multi_worker_model = build_and_compile_cnn_model() multi_worker_model.fit(x=train_datasets, epochs=3, callbacks=callbacks)
Epoch 1/3 469/Unknown - 8s 18ms/step - loss: 2.2049 - accuracy: 0.2318WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1781: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version. Instructions for updating: If using Keras pass *_constraint arguments to layers. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.6/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1781: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version. Instructions for updating: If using Keras pass *_constraint arguments to layers. INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets 469/469 [==============================] - 9s 19ms/step - loss: 2.2049 - accuracy: 0.2318 Epoch 2/3 451/469 [===========================>..] - ETA: 0s - loss: 1.9195 - accuracy: 0.5715INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets 469/469 [==============================] - 2s 4ms/step - loss: 1.9113 - accuracy: 0.5767 Epoch 3/3 450/469 [===========================>..] - ETA: 0s - loss: 1.4175 - accuracy: 0.7550INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets INFO:tensorflow:Assets written to: /tmp/keras-ckpt/assets 469/469 [==============================] - 2s 4ms/step - loss: 1.4078 - accuracy: 0.7561 <tensorflow.python.keras.callbacks.History at 0x7fc38fdfee80>
ワーカーが無効にされる場合、クラスタ全体は無効にされたワーカーが再開始されるまで一時的に停止します。ひとたびワーカーがクラスタに再び加われば、他のワーカーも再開始します。今は、総てのワーカーは前にセーブされたチェックポイント・ファイルを読みそして前の状態をピックアップし、従ってクラスタに同期的に戻ることを可能にします。そして訓練は続行します。
ModelCheckpoint で指定した filepath を含むディレクトリを調べると、幾つかの一時的に生成されたチェックポイント・ファイルに気付くかもしれません。それらのファイルは前に失われたインスタンスをリカバーするために必要です、そしてそれらはマルチワーカー訓練の成功的な exit の時に tf.keras.Model.fit() の最後でライブラリにより削除されます。
See also
- Distributed Training in TensorFlow ガイドは利用可能な分散ストラテジーの概要を提供します。
- MirroredStrategy か MultiWorkerMirroredStrategy を使用して訓練できる、公式の ResNet50 モデル。
以上