TensorFlow 2.0 : 上級 Tutorials : 分散訓練 :- tf.distribute.Strategy でカスタム訓練 (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 10/30/2019
* 本ページは、TensorFlow org サイトの TF 2.0 – Advanced Tutorials – Distributed training の以下のページを翻訳した上で
適宜、補足説明したものです:
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。
- お住まいの地域に関係なく Web ブラウザからご参加頂けます。事前登録 が必要ですのでご注意ください。
- Windows PC のブラウザからご参加が可能です。スマートデバイスもご利用可能です。
◆ お問合せ : 本件に関するお問い合わせ先は下記までお願いいたします。
株式会社クラスキャット セールス・マーケティング本部 セールス・インフォメーション |
E-Mail:sales-info@classcat.com ; WebSite: https://www.classcat.com/ |
Facebook: https://www.facebook.com/ClassCatJP/ |
分散訓練 :- tf.distribute.Strategy でカスタム訓練
このチュートリアルはカスタム訓練ループで tf.distribute.Strategy をどのように使用するかを実演します。fashion MNIST データセット上で単純な CNN モデルを訓練します。fashion MNIST データセットはサイズ 28 x 28 の 60000 訓練画像とサイズ 28 x 28 の 10000 テスト画像を含みます。
モデルを訓練するためにカスタム訓練ループを使用しています、何故ならばそれらは訓練の上で柔軟性とより良い制御を与えてくれるからです。更に、モデルと訓練ループをデバッグすることを容易にします。
from __future__ import absolute_import, division, print_function, unicode_literals # Import TensorFlow import tensorflow as tf # Helper libraries import numpy as np import os print(tf.__version__)
2.0.0
fashion mnist データセットをダウンロードする
fashion_mnist = tf.keras.datasets.fashion_mnist (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data() # Adding a dimension to the array -> new shape == (28, 28, 1) # We are doing this because the first layer in our model is a convolutional # layer and it requires a 4D input (batch_size, height, width, channels). # batch_size dimension will be added later on. train_images = train_images[..., None] test_images = test_images[..., None] # Getting the images in [0, 1] range. train_images = train_images / np.float32(255) test_images = test_images / np.float32(255)
変数とグラフを分散するためのストラテジーを作成する
tf.distribute.MirroredStrategy ストラテジーはどのように動作するのでしょう?
- 総ての変数とモデル・グラフはレプリカ上で複製されます。
- 入力はレプリカに渡り均等に分散されます。
- 各レプリカはそれが受け取った入力のための損失と勾配を計算します。
- 勾配はそれらを合計することにより総てのレプリカに渡り同期されます。
- 同期後、同じ更新が各レプリカ上の変数のコピーに行われます。
Note: 下の総てのコードを単一のスコープの内側に配置できます。説明目的でそれを幾つかのコードセルに分割しています。
# If the list of devices is not specified in the # `tf.distribute.MirroredStrategy` constructor, it will be auto-detected. strategy = tf.distribute.MirroredStrategy()
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1
入力パイプラインをセットアップする
グラフと変数をプラットフォーム不可知な SavedModel フォーマットにエクスポートします。モデルがセーブされた後、スコープとともに、あるいはスコープなしでそれをロードできます。
BUFFER_SIZE = len(train_images) BATCH_SIZE_PER_REPLICA = 64 GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync EPOCHS = 10
データセットを作成してそれらを分散します :
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE) test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE) train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset) test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
モデル作成
tf.keras.Sequential を使用してモデルを作成します。これを行なうために Model Subclassing API を使用することもできます。
def create_model(): model = tf.keras.Sequential([ tf.keras.layers.Conv2D(32, 3, activation='relu'), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Conv2D(64, 3, activation='relu'), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Flatten(), tf.keras.layers.Dense(64, activation='relu'), tf.keras.layers.Dense(10, activation='softmax') ]) return model
# Create a checkpoint directory to store the checkpoints. checkpoint_dir = './training_checkpoints' checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
損失関数を定義する
通常は、1 GPU/CPU を持つ単一マシン上では、損失は入力のバッチのサンプル数により除算されます。
それでは、tf.distribute.Strategy を使用するとき損失はどのように計算されるべきでしょう?
- 例えば、貴方は 4 GPU と 64 のバッチサイズを持つとしましょう。入力の一つのバッチはレプリカ (4 GPU) に渡り分散され、各レプリカはサイズ 16 の入力を得ます。
- 各レプリカ上のモデルはそれぞれの入力で forward パスを行ないそして損失を計算します。今は、損失をそれぞれの入力のサンプル数 (BATCH_SIZE_PER_REPLICA = 16) で除算する代わりに、損失は GLOBAL_BATCH_SIZE (64) で除算されるべきです。
何故これを行なうのでしょう?
- これが行われるのは各レプリカ上で勾配が計算された後、それらを総計することによりそれらはレプリカに渡り同期されるからです。
TensorFlow でこれをどのようにに処理しますか? * カスタム訓練ループを書いている場合、このチュートリアルでのように、サンプル毎損失を合計して GLOBAL_BATCH_SIZE で合計を除算するべきです: scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE) あるいは tf.nn.compute_average_loss を使用できます、これは引数としてサンプル毎損失、オプションのサンプル重み、そして GLOBAL_BATCH_SIZE を取りそしてスケールされた損失を返します。
- 貴方のモデルで regularization 損失を使用している場合には、損失値をレプリカの数でスケールする必要があります。tf.nn.scale_regularization_loss 関数を使用してこれを行なうことができます。
- tf.reduce_mean の使用は推奨されません。それを行なうことは損失を実際のレプリカ毎バッチサイズで除算します、これはステップ毎に変化するかもしれません。
- このリダクションとスケーリングは keras model.compile と model.fit で自動的に行なわれます。
- (下のサンプルのように) tf.keras.losses クラスを使用する場合、損失リダクションは NONE か SUM の一つとして明示的に指定される必要があります。 tf.distribute.Strategy で使用されるとき AUTO と SUM_OVER_BATCH_SIZE は許容されません。AUTO は許容されません、何故ならば分散ケースでそれが正しいことを確実にするためにユーザはどのリダクションを望むかについて明示的に考えるべきだからです。SUM_OVER_BATCH_SIZE は許容されません、何故ならば現在それはレプリカ毎バッチサイズで除算するだけだからです、そしてレプリカ数による除算はユーザに委ねられます、これは簡単にミスしてしまうかもしれません。それで代わりにユーザが彼ら自身でリダクションを行なうかを明示的に尋ねます。
with strategy.scope(): # Set reduction to `none` so we can do the reduction afterwards and divide by # global batch size. loss_object = tf.keras.losses.SparseCategoricalCrossentropy( reduction=tf.keras.losses.Reduction.NONE) # or loss_fn = tf.keras.losses.sparse_categorical_crossentropy def compute_loss(labels, predictions): per_example_loss = loss_object(labels, predictions) return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)
損失と精度を追跡するためにメトリクスを定義する
これらのメトリクスは損失及び訓練とテスト精度を追跡します。累積された統計情報を得るためにいつでも .result() を使用できます。
with strategy.scope(): test_loss = tf.keras.metrics.Mean(name='test_loss') train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy( name='train_accuracy') test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy( name='test_accuracy')
訓練ループ
# model and optimizer must be created under `strategy.scope`. with strategy.scope(): model = create_model() optimizer = tf.keras.optimizers.Adam() checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
with strategy.scope(): def train_step(inputs): images, labels = inputs with tf.GradientTape() as tape: predictions = model(images, training=True) loss = compute_loss(labels, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) train_accuracy.update_state(labels, predictions) return loss def test_step(inputs): images, labels = inputs predictions = model(images, training=False) t_loss = loss_object(labels, predictions) test_loss.update_state(t_loss) test_accuracy.update_state(labels, predictions)
with strategy.scope(): # `experimental_run_v2` replicates the provided computation and runs it # with the distributed input. @tf.function def distributed_train_step(dataset_inputs): per_replica_losses = strategy.experimental_run_v2(train_step, args=(dataset_inputs,)) return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None) @tf.function def distributed_test_step(dataset_inputs): return strategy.experimental_run_v2(test_step, args=(dataset_inputs,)) for epoch in range(EPOCHS): # TRAIN LOOP total_loss = 0.0 num_batches = 0 for x in train_dist_dataset: total_loss += distributed_train_step(x) num_batches += 1 train_loss = total_loss / num_batches # TEST LOOP for x in test_dist_dataset: distributed_test_step(x) if epoch % 2 == 0: checkpoint.save(checkpoint_prefix) template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, " "Test Accuracy: {}") print (template.format(epoch+1, train_loss, train_accuracy.result()*100, test_loss.result(), test_accuracy.result()*100)) test_loss.reset_states() train_accuracy.reset_states() test_accuracy.reset_states()
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). Epoch 1, Loss: 0.5149696469306946, Accuracy: 81.5250015258789, Test Loss: 0.37859389185905457, Test Accuracy: 86.23999786376953 Epoch 2, Loss: 0.33567535877227783, Accuracy: 87.95333862304688, Test Loss: 0.34586820006370544, Test Accuracy: 87.48999786376953 Epoch 3, Loss: 0.29027265310287476, Accuracy: 89.45333099365234, Test Loss: 0.3082132041454315, Test Accuracy: 89.19000244140625 Epoch 4, Loss: 0.25970742106437683, Accuracy: 90.52000427246094, Test Loss: 0.2932600975036621, Test Accuracy: 89.62000274658203 Epoch 5, Loss: 0.23640099167823792, Accuracy: 91.24833679199219, Test Loss: 0.2744646370410919, Test Accuracy: 89.93000030517578 Epoch 6, Loss: 0.21604427695274353, Accuracy: 92.07999420166016, Test Loss: 0.2818734049797058, Test Accuracy: 89.81999969482422 Epoch 7, Loss: 0.19880548119544983, Accuracy: 92.59500122070312, Test Loss: 0.2676812708377838, Test Accuracy: 90.24000549316406 Epoch 8, Loss: 0.1839067041873932, Accuracy: 93.10166931152344, Test Loss: 0.24938379228115082, Test Accuracy: 90.88999938964844 Epoch 9, Loss: 0.16903717815876007, Accuracy: 93.7733383178711, Test Loss: 0.25704431533813477, Test Accuracy: 90.75 Epoch 10, Loss: 0.15608058869838715, Accuracy: 94.10832977294922, Test Loss: 0.2633570432662964, Test Accuracy: 90.6500015258789
上のサンプルで注意すべきことは :
- “for x in …” 構成要素を使用して train_dist_dataset と test_dist_dataset に渡り iterate しています。
- スケールされた損失は distributed_train_step の戻り値です。この値は、tf.distribute.Strategy.reduce 呼び出しを使用してレプリカに渡り、それから tf.distribute.Strategy.reduce 呼び出しの戻り値を合計することによりバッチに渡り累積されます。
- tf.keras.Metrics は tf.distribute.Strategy.experimental_run_v2 により実行される train_step and test_step 内で更新されるべきです。* tf.distribute.Strategy.experimental_run_v2 はストラテジーで各ローカルレプリカからの結果を返し、そしてこの結果を消費する複数の方法があります。累積値を得るために tf.distribute.Strategy.reduce を行なうことができます。ローカルレプリカ毎に一つ、結果に含まれる値のリストを得るために tf.distribute.Strategy.experimental_local_results を行なうこともできます。
最新のチェックポイントを復元してテストする
tf.distribute.Strategy でチェックポイントされたモデルはストラテジーとともに、またはストラテジーなしでリストアできます。
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy( name='eval_accuracy') new_model = create_model() new_optimizer = tf.keras.optimizers.Adam() test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function def eval_step(images, labels): predictions = new_model(images, training=False) eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model) checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir)) for images, labels in test_dataset: eval_step(images, labels) print ('Accuracy after restoring the saved model without strategy: {}'.format( eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 90.75
データセットに渡り iterate する別の方法
iterator を使用する
与えられた数のステップに渡り、そしてデータセット全体を通すのではなく iterate することを望む場合、iterator 上で iter 呼び出しと明示的に呼び出し next を使用して iterator を作成することができます。tf.functiuon の内側と外側の両者でデータセットに渡り iterate することを選択できます。ここにiterator を使用して tf.function の外側でデータセットの iteration を実演する小さいスニペットがあります。
with strategy.scope(): for _ in range(EPOCHS): total_loss = 0.0 num_batches = 0 train_iter = iter(train_dist_dataset) for _ in range(10): total_loss += distributed_train_step(next(train_iter)) num_batches += 1 average_train_loss = total_loss / num_batches template = ("Epoch {}, Loss: {}, Accuracy: {}") print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100)) train_accuracy.reset_states()
Epoch 10, Loss: 0.13162840902805328, Accuracy: 94.53125 Epoch 10, Loss: 0.11109195649623871, Accuracy: 96.5625 Epoch 10, Loss: 0.1300327479839325, Accuracy: 94.6875 Epoch 10, Loss: 0.102020762860775, Accuracy: 96.25 Epoch 10, Loss: 0.09842313826084137, Accuracy: 97.03125 Epoch 10, Loss: 0.13631784915924072, Accuracy: 94.375 Epoch 10, Loss: 0.11527981609106064, Accuracy: 95.0 Epoch 10, Loss: 0.09563014656305313, Accuracy: 96.09375 Epoch 10, Loss: 0.13050365447998047, Accuracy: 94.6875 Epoch 10, Loss: 0.1324475109577179, Accuracy: 94.6875
tf.function の内側で iterate する
“for x in …” 構成要素を使用するか上で行なったように iterator を作成することにより tf.function の内側で入力 train_dist_dataset 全体に渡り iterate することもできます。下のサンプルは tf.function 内で訓練の 1 エポックをラップして関数の内側で train_dist_dataset に渡り iterate することを実演します。
with strategy.scope(): @tf.function def distributed_train_epoch(dataset): total_loss = 0.0 num_batches = 0 for x in dataset: per_replica_losses = strategy.experimental_run_v2(train_step, args=(x,)) total_loss += strategy.reduce( tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None) num_batches += 1 return total_loss / tf.cast(num_batches, dtype=tf.float32) for epoch in range(EPOCHS): train_loss = distributed_train_epoch(train_dist_dataset) template = ("Epoch {}, Loss: {}, Accuracy: {}") print (template.format(epoch+1, train_loss, train_accuracy.result()*100)) train_accuracy.reset_states()
Epoch 1, Loss: 0.14456823468208313, Accuracy: 94.57666778564453 Epoch 2, Loss: 0.1337885707616806, Accuracy: 94.8949966430664 Epoch 3, Loss: 0.12238477170467377, Accuracy: 95.37332916259766 Epoch 4, Loss: 0.11225613951683044, Accuracy: 95.75333404541016 Epoch 5, Loss: 0.10508091002702713, Accuracy: 96.11000061035156 Epoch 6, Loss: 0.09647118300199509, Accuracy: 96.38666534423828 Epoch 7, Loss: 0.09062784165143967, Accuracy: 96.57666778564453 Epoch 8, Loss: 0.08338376134634018, Accuracy: 96.91000366210938 Epoch 9, Loss: 0.07607696950435638, Accuracy: 97.0999984741211 Epoch 10, Loss: 0.06851287186145782, Accuracy: 97.46500396728516
レプリカに渡る訓練損失を追跡する
Note: 一般的なルールとして、サンプル毎の値を追跡してレプリカ内に累積された値を回避するために tf.keras.Metrics を使用するべきです。
実行される損失スケーリング計算ゆえに、異なるレプリカに渡る訓練損失を追跡するために tf.metrics.Mean を使用することを勧めません。
例えば、次の特性で訓練ジョブを実行する場合 : * 2 つのレプリカ * 2 つのサンプルが各レプリカで処理されます * 結果としての損失値: 各レプリカ上で [2, 3] と [4, 5] * グローバル・バッチサイズ = 4
損失スケーリングで、損失値を可算して、それからグローバル・バッチサイズで除算することにより各レプリカ上の損失のサンプル毎値を計算します。この場合、(2 + 3) / 4 = 1.25 と (4 + 5) / 4 = 2.25。
2 つのレプリカに渡り損失を追跡するために tf.metrics.Mean を使用する場合、結果は異なります。このサンプルでは、3.50 の total と 2 の count で終わり、これはメトリックで result() が呼び出されるとき total/count = 1.75 という結果になります。tf.keras.Metrics で計算される損失は同期するレプリカの数に等しい追加の因子でスケールされます。
サンプルとチュートリアル
ここにカスタム訓練ループで分散ストラテジーを使用する幾つかのサンプルがあります :
- MirroredStrategy を使用して MNIST を訓練する チュートリアル。
- MirroredStrategy を使用する DenseNet サンプル。
- MirroredStrategy and TPUStrategy を使用して訓練される BERT サンプル。このサンプルはどのようにチェックポイントからロードするかそして分散訓練の間に定期的にチェックポイントを生成するか等を理解するために特に有用です。
- MirroredStrategy を使用して訓練される NCF サンプル、これは keras_use_ctl フラグを使用して有効にできます。
- MirroredStrategy を使用して訓練される NMT サンプル。
分散ストラテジー・ガイド でリストされるより多くのサンプル
Next steps
貴方のモデル上で tf.distribute.Strategy API を試してください。
以上