ホーム » Keras » TensorFlow 2.4 : ガイド : Keras :- 訓練ループをスクラッチから書く

TensorFlow 2.4 : ガイド : Keras :- 訓練ループをスクラッチから書く

TensorFlow 2.0 : ガイド : Keras :- 訓練ループをスクラッチから書く (翻訳/解説)

翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 01/16/2021

* 本ページは、TensorFlow org サイトの Guide – Keras の以下のページを翻訳した上で
適宜、補足説明したものです:

* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。

 

無料セミナー実施中 クラスキャット主催 人工知能 & ビジネス Web セミナー

人工知能とビジネスをテーマにウェビナー (WEB セミナー) を定期的に開催しています。スケジュールは弊社 公式 Web サイト でご確認頂けます。
  • お住まいの地域に関係なく Web ブラウザからご参加頂けます。事前登録 が必要ですのでご注意ください。
  • Windows PC のブラウザからご参加が可能です。スマートデバイスもご利用可能です。
クラスキャットは人工知能・テレワークに関する各種サービスを提供しております :

人工知能研究開発支援 人工知能研修サービス テレワーク & オンライン授業を支援
PoC(概念実証)を失敗させないための支援 (本支援はセミナーに参加しアンケートに回答した方を対象としています。

お問合せ : 本件に関するお問い合わせ先は下記までお願いいたします。

株式会社クラスキャット セールス・マーケティング本部 セールス・インフォメーション
E-Mail:sales-info@classcat.com ; WebSite: https://www.classcat.com/
Facebook: https://www.facebook.com/ClassCatJP/

 

ガイド : Keras :- 訓練ループをスクラッチから書く

セットアップ

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

 

イントロダクション

Keras はデフォルトの訓練と評価ループ、fit() と evaluate() を提供します、それらの使用方法はガイド 組込みメソッドで訓練と評価 でカバーされます。

(例えば、fit() を使用して GAN を訓練するために) fit() の便利さを依然として活用しながら貴方のモデルの学習アルゴリズムをカスタマイズすることを望む場合、Model クラスをサブクラス化して貴方自身の train_step() メソッドを実装できます、これは fit() の間に繰り返し呼び出されます。これはガイド Model.fit で起きることをカスタマイズする でカバーされます。

今は、訓練 & 評価に渡り非常に低位な制御を望む場合、貴方自身の訓練 & 評価ループをスクラッチから書くべきです。このガイドはこれについてです。

 

GradientTape を使用する : 最初の end-to-end サンプル

GradientTape スコープ内でのモデルの呼び出しは損失値に関する層の訓練可能な重みの勾配を取得することを可能にします。optimizer インスタンスを使用して、これらの変数 (これらは model.trainable_weights を使用して取得できます) を更新するためにこれらの勾配を使用できます。

単純な MNIST モデルを考えましょう :

inputs = keras.Input(shape=(784,), name="digits")
x1 = layers.Dense(64, activation="relu")(inputs)
x2 = layers.Dense(64, activation="relu")(x1)
outputs = layers.Dense(10, name="predictions")(x2)
model = keras.Model(inputs=inputs, outputs=outputs)

カスタム訓練ループでミニバッチ勾配を使用してそれを訓練しましょう。

最初に、optimizer、損失関数、そしてデータセットを必要とします :

# Instantiate an optimizer.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the training dataset.
batch_size = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))

# Reserve 10,000 samples for validation.
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]

# Prepare the training dataset.
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)

# Prepare the validation dataset.
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(batch_size)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step

ここに私達の訓練ループがあります :

  • エポックに渡り反復する for ループを開始します。
  • 各エポックについて、データセットに渡り、バッチで反復する for ループを開始します。
  • 各バッチについて、GradientTape() スコープを開始します。
  • このスコープ内で、model (forward パス) を呼び出して損失を計算します。
  • スコープの外で、損失に関するモデルの重みの勾配を取得します。
  • 最後に、勾配に基づいてモデルの重みを更新するために optimizer を使用します。
epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):

        # Open a GradientTape to record the operations run
        # during the forward pass, which enables auto-differentiation.
        with tf.GradientTape() as tape:

            # Run the forward pass of the layer.
            # The operations that the layer applies
            # to its inputs are going to be recorded
            # on the GradientTape.
            logits = model(x_batch_train, training=True)  # Logits for this minibatch

            # Compute the loss value for this minibatch.
            loss_value = loss_fn(y_batch_train, logits)

        # Use the gradient tape to automatically retrieve
        # the gradients of the trainable variables with respect to the loss.
        grads = tape.gradient(loss_value, model.trainable_weights)

        # Run one step of gradient descent by updating
        # the value of the variables to minimize the loss.
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %s samples" % ((step + 1) * 64))
Start of epoch 0
Training loss (for one batch) at step 0: 96.9620
Seen so far: 64 samples
Training loss (for one batch) at step 200: 1.8153
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 1.1619
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 1.1577
Seen so far: 38464 samples

Start of epoch 1
Training loss (for one batch) at step 0: 0.8497
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.6811
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 1.0251
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.6824
Seen so far: 38464 samples

 

メトリクスの低位処理

この基本的なループにメトリクス監視を追加しましょう。

スクラッチから書かれたそのような訓練ループで組込みメトリクス (or 貴方が書いたカスタムメトリクス) を容易に再利用できます。ここにフローがあります :

  • ループの開始でメトリックをインスタンス化します。
  • 各バッチ後に metric.update_state() を呼び出します。
  • メトリックの現在の値を表示する必要があるとき metric.result() を呼び出します。
  • メトリックの状態をクリアする必要があるとき (典型的にはエポックの最後) metric.reset_states() を呼び出します。

各エポックの最後に検証データ上 SparseCategoricalAccuracy を計算するためにこの知識を利用しましょう :

# Get model
inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = layers.Dense(10, name="predictions")(x)
model = keras.Model(inputs=inputs, outputs=outputs)

# Instantiate an optimizer to train the model.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# Prepare the metrics.
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()

ここに私達の訓練 & 評価ループがあります :

import time

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        with tf.GradientTape() as tape:
            logits = model(x_batch_train, training=True)
            loss_value = loss_fn(y_batch_train, logits)
        grads = tape.gradient(loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))

        # Update training metric.
        train_acc_metric.update_state(y_batch_train, logits)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * 64))

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        val_logits = model(x_batch_val, training=False)
        # Update val metrics
        val_acc_metric.update_state(y_batch_val, val_logits)
    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
    print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0
Training loss (for one batch) at step 0: 112.9925
Seen so far: 64 samples
Training loss (for one batch) at step 200: 1.3940
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 1.2486
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 1.9907
Seen so far: 38464 samples
Training acc over epoch: 0.6960
Validation acc: 0.7993
Time taken: 5.56s

Start of epoch 1
Training loss (for one batch) at step 0: 0.5860
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.9344
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.5471
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.6339
Seen so far: 38464 samples
Training acc over epoch: 0.8266
Validation acc: 0.8626
Time taken: 5.49s

 

貴方の訓練ステップを tf.function でスピードアップする

TensorFlow 2.0 のデフォルトのランタイムは eager 実行 です。そのようなものとして、上の訓練ループは eagerly に実行されます。

これはデバッグのためには素晴らしいですが、グラフコンパイルは明らかなパフォーマンス優位を持ちます。貴方の計算を静的グラフとして記述することはフレームワークがグローバルなパフォーマンス最適化を適用することを有効にします。これはフレームワークが (次に何が来るかの知識なしに) 1 つの演算を他の 1 つの後に貪欲に実行するように制約されているときには不可能です。

入力として tensor を取る任意の関数を静的グラフにコンパイルできます。その上に @tf.function デコレータを単に追加します、このようにです :

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_acc_metric.update_state(y, logits)
    return loss_value

評価ステップでも同じことを行ないましょう :

@tf.function
def test_step(x, y):
    val_logits = model(x, training=False)
    val_acc_metric.update_state(y, val_logits)

今は、訓練ループをこのコンパイルされた訓練ステップで再実行しましょう :

import time

epochs = 2
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))
    start_time = time.time()

    # Iterate over the batches of the dataset.
    for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
        loss_value = train_step(x_batch_train, y_batch_train)

        # Log every 200 batches.
        if step % 200 == 0:
            print(
                "Training loss (for one batch) at step %d: %.4f"
                % (step, float(loss_value))
            )
            print("Seen so far: %d samples" % ((step + 1) * 64))

    # Display metrics at the end of each epoch.
    train_acc = train_acc_metric.result()
    print("Training acc over epoch: %.4f" % (float(train_acc),))

    # Reset training metrics at the end of each epoch
    train_acc_metric.reset_states()

    # Run a validation loop at the end of each epoch.
    for x_batch_val, y_batch_val in val_dataset:
        test_step(x_batch_val, y_batch_val)

    val_acc = val_acc_metric.result()
    val_acc_metric.reset_states()
    print("Validation acc: %.4f" % (float(val_acc),))
    print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0
Training loss (for one batch) at step 0: 0.8467
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.4002
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.7801
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.3799
Seen so far: 38464 samples
Training acc over epoch: 0.8683
Validation acc: 0.8857
Time taken: 1.38s

Start of epoch 1
Training loss (for one batch) at step 0: 0.7573
Seen so far: 64 samples
Training loss (for one batch) at step 200: 0.3093
Seen so far: 12864 samples
Training loss (for one batch) at step 400: 0.3223
Seen so far: 25664 samples
Training loss (for one batch) at step 600: 0.7105
Seen so far: 38464 samples
Training acc over epoch: 0.8869
Validation acc: 0.8948
Time taken: 1.03s

Much faster, isn’t it?

 

モデルにより追跡される損失の低位処理

層 & モデルは self.add_loss(value) を呼び出す層により forward パスの間に作成された任意の損失を再帰的に追跡します。スカラー損失値の結果としてのリストは forward パスの最後にプロパティ model.losses を通して利用可能です。

これらの損失成分を使用することを望む場合、訓練ステップ内でそれらを合計してそれらを主要損失に追加するべきです。

この層を考えます、これは activity 正則化損失を作成します :

class ActivityRegularizationLayer(layers.Layer):
    def call(self, inputs):
        self.add_loss(1e-2 * tf.reduce_sum(inputs))
        return inputs

それを使用する実際に単純なモデルを構築しましょう :

inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu")(inputs)
# Insert activity regularization as a layer
x = ActivityRegularizationLayer()(x)
x = layers.Dense(64, activation="relu")(x)
outputs = layers.Dense(10, name="predictions")(x)

model = keras.Model(inputs=inputs, outputs=outputs)

ここに今は訓練ステップがどのように見えるべきかがあります :

@tf.function
def train_step(x, y):
    with tf.GradientTape() as tape:
        logits = model(x, training=True)
        loss_value = loss_fn(y, logits)
        # Add any extra losses created during the forward pass.
        loss_value += sum(model.losses)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    train_acc_metric.update_state(y, logits)
    return loss_value

 

要約

今では組込み訓練ループを使用することと貴方自身のものをスクラッチから書くことについて知るべき (存在する) 総てのことを貴方は知っています。

結論として、ここに単純な end-to-end サンプルがあります、これはこのガイドで学習した総てを一緒に結び付けています : MNIST 数字上で訓練された DCGAN です。

 

End-to-end サンプル : スクラッチからの GAN 訓練ループ

貴方は敵対的生成ネットワーク (GAN, Generative Adversarial Networks) に馴染みがあるかもしれません。GAN は画像の訓練データセットの潜在的分布を学習することにより (画像の「潜在的空間 (= latent space)」)、殆どリアルに見える新しい画像を生成できます。

GAN は 2 つのパートから成ります : “generator” モデル、これは潜在的空間のポイントを画像空間のポイントにマップし、そして “discrimintor” モデル、分類器は (訓練データセットからの) リアル画像と (generator ネットワークの出力) フェイク画像の間の違いを識別できます。

GAN 訓練はこのようなものです :

1) discriminator を訓練する。- 潜在的空間のランダムポイントのバッチをサンプリングします。- ポイントを “generator” モデルを通してフェイク画像に変えます。- リアル画像のバッチを得てそれらを生成された画像と結合します。- 生成された (画像) vs. リアル画像を分類するために “discriminator” モデルを訓練します。

2) generator を訓練する。- 潜在的空間のランダムポイントをサンプリングします。- ポイントを “generator” ネットワークを通してフェイク画像に変えます。- リアル画像のバッチを得てそれらを生成された画像と結合します。- discriminator を「騙して (= fool)」そしてフェイク画像をリアルとして分類するように “generator” モデルを訓練します。

GAN がどのように動作するかの遥かに詳細な概要については、Deep Learning with Python を見てください。

この訓練ループを実装しましょう。最初に、フェイク vs リアル数字を分類することを意図した discriminator を作成します :

discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)
discriminator.summary()
Model: "discriminator"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv2d (Conv2D)              (None, 14, 14, 64)        640       
_________________________________________________________________
leaky_re_lu (LeakyReLU)      (None, 14, 14, 64)        0         
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 7, 7, 128)         73856     
_________________________________________________________________
leaky_re_lu_1 (LeakyReLU)    (None, 7, 7, 128)         0         
_________________________________________________________________
global_max_pooling2d (Global (None, 128)               0         
_________________________________________________________________
dense_4 (Dense)              (None, 1)                 129       
=================================================================
Total params: 74,625
Trainable params: 74,625
Non-trainable params: 0
_________________________________________________________________

そして generator ネットワークを生成しましょう、それは潜在ベクトルを (MNIST 数字を表す) shape (28, 28, 1) の出力に変えます :

latent_dim = 128

generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

ここにキービット (= key bit) があります : 訓練ループです。見れるようにそれは非常に簡単です。訓練ステップ関数は 17 行だけを取ります。

# Instantiate one optimizer for the discriminator and another for the generator.
d_optimizer = keras.optimizers.Adam(learning_rate=0.0003)
g_optimizer = keras.optimizers.Adam(learning_rate=0.0004)

# Instantiate a loss function.
loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)


@tf.function
def train_step(real_images):
    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
    # Decode them to fake images
    generated_images = generator(random_latent_vectors)
    # Combine them with real images
    combined_images = tf.concat([generated_images, real_images], axis=0)

    # Assemble labels discriminating real from fake images
    labels = tf.concat(
        [tf.ones((batch_size, 1)), tf.zeros((real_images.shape[0], 1))], axis=0
    )
    # Add random noise to the labels - important trick!
    labels += 0.05 * tf.random.uniform(labels.shape)

    # Train the discriminator
    with tf.GradientTape() as tape:
        predictions = discriminator(combined_images)
        d_loss = loss_fn(labels, predictions)
    grads = tape.gradient(d_loss, discriminator.trainable_weights)
    d_optimizer.apply_gradients(zip(grads, discriminator.trainable_weights))

    # Sample random points in the latent space
    random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
    # Assemble labels that say "all real images"
    misleading_labels = tf.zeros((batch_size, 1))

    # Train the generator (note that we should *not* update the weights
    # of the discriminator)!
    with tf.GradientTape() as tape:
        predictions = discriminator(generator(random_latent_vectors))
        g_loss = loss_fn(misleading_labels, predictions)
    grads = tape.gradient(g_loss, generator.trainable_weights)
    g_optimizer.apply_gradients(zip(grads, generator.trainable_weights))
    return d_loss, g_loss, generated_images

画像のバッチ上で train_step を繰り返し呼び出すことにより、私達の GAN を訓練しましょう。discriminator と generator は convnet (畳込みネットワーク) ですので、このコードを GPU 上で実行することを望むはずです。

import os

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

epochs = 1  # In practice you need at least 20 epochs to generate nice digits.
save_dir = "./"

for epoch in range(epochs):
    print("\nStart epoch", epoch)

    for step, real_images in enumerate(dataset):
        # Train the discriminator & generator on one batch of real images.
        d_loss, g_loss, generated_images = train_step(real_images)

        # Logging.
        if step % 200 == 0:
            # Print metrics
            print("discriminator loss at step %d: %.2f" % (step, d_loss))
            print("adversarial loss at step %d: %.2f" % (step, g_loss))

            # Save one generated image
            img = tf.keras.preprocessing.image.array_to_img(
                generated_images[0] * 255.0, scale=False
            )
            img.save(os.path.join(save_dir, "generated_img" + str(step) + ".png"))

        # To limit execution time we stop after 10 steps.
        # Remove the lines below to actually train the model!
        if step > 10:
            break
Start epoch 0
discriminator loss at step 0: 0.70
adversarial loss at step 0: 0.68

That’s it! Colab GPU 上で単なる ~30s の訓練後に見栄えの良いフェイク MNIST 数字を得るでしょう。

 

以上



AI導入支援 #2 ウェビナー

スモールスタートを可能としたAI導入支援   Vol.2
[無料 WEB セミナー] [詳細]
「画像認識 AI PoC スターターパック」の紹介
既に AI 技術を実ビジネスで活用し、成果を上げている日本企業も多く存在しており、競争優位なビジネスを展開しております。
しかしながら AI を導入したくとも PoC (概念実証) だけでも高額な費用がかかり取組めていない企業も少なくないようです。A I導入時には欠かせない PoC を手軽にしかも短期間で認知度を確認可能とするサービの紹介と共に、AI 技術の特性と具体的な導入プロセスに加え運用時のポイントについても解説いたします。
日時:2021年10月13日(水)
会場:WEBセミナー
共催:クラスキャット、日本FLOW(株)
後援:働き方改革推進コンソーシアム
参加費: 無料 (事前登録制)
人工知能開発支援
◆ クラスキャットは 人工知能研究開発支援 サービスを提供しています :
  • テクニカルコンサルティングサービス
  • 実証実験 (プロトタイプ構築)
  • アプリケーションへの実装
  • 人工知能研修サービス
◆ お問合せ先 ◆
(株)クラスキャット
セールス・インフォメーション
E-Mail:sales-info@classcat.com