ホーム » Keras » TensorFlow 2.4 : ガイド : Keras :- Model.fit で起きることをカスタマイズする

TensorFlow 2.4 : ガイド : Keras :- Model.fit で起きることをカスタマイズする

TensorFlow 2.0 : ガイド : Keras :- Model.fit で起きることをカスタマイズする (翻訳/解説)

翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 01/15/2021

* 本ページは、TensorFlow org サイトの Guide – Keras の以下のページを翻訳した上で
適宜、補足説明したものです:

* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。

 

無料セミナー実施中 クラスキャット主催 人工知能 & ビジネス Web セミナー

人工知能とビジネスをテーマにウェビナー (WEB セミナー) を定期的に開催しています。スケジュールは弊社 公式 Web サイト でご確認頂けます。
  • お住まいの地域に関係なく Web ブラウザからご参加頂けます。事前登録 が必要ですのでご注意ください。
  • Windows PC のブラウザからご参加が可能です。スマートデバイスもご利用可能です。
クラスキャットは人工知能・テレワークに関する各種サービスを提供しております :

人工知能研究開発支援 人工知能研修サービス テレワーク & オンライン授業を支援
PoC(概念実証)を失敗させないための支援 (本支援はセミナーに参加しアンケートに回答した方を対象としています。

お問合せ : 本件に関するお問い合わせ先は下記までお願いいたします。

株式会社クラスキャット セールス・マーケティング本部 セールス・インフォメーション
E-Mail:sales-info@classcat.com ; WebSite: https://www.classcat.com/
Facebook: https://www.facebook.com/ClassCatJP/

 

ガイド : Keras :- Model.fit で起きることをカスタマイズする

イントロダクション

教師あり学習を行なっているとき、fit() を使用できて総てがスムースに動作します。

貴方自身の訓練ループをスクラッチから書く必要があるとき、GradientTape を利用して総ての小さな詳細を制御できます。

しかしカスタム訓練アルゴリズムを必要としながら、コールバック、組込みの分散サポート、あるはステップ融合 (= fusing) のような fit() の便利な特徴から依然として恩恵を受けることを望む場合にはどうでしょう?

Keras の中心的な原理は 複雑さの進歩的な (= progressive) 公開 です。貴方は低位ワークフローに漸進的に進むことができるはずです。高位機能が貴方のユースケースに正確には適合しない場合、崖から落ちるべきではありません。高位の同等の便利さを維持しながら詳細に渡る制御を得ることができるはずです。

fit() が行なうことをカスタマイズする必要があるとき、Model クラスの訓練ステップ関数を override する べきです。これはデータの総てのバッチのために fit() により呼び出される関数です。それから通常のように fit() を呼び出すことができます — そしてそれは貴方自身の学習アルゴリズムを実行しているでしょう。

このパターンはモデルを Functional API で構築することを妨げないことに注意してください。貴方がシーケンシャル・モデル、Functional API モデル、あるいはサブクラス化モデルを構築していようが、これを行なうことができます。

それがどのように動作するか見ましょう。

 

セットアップ

TensorFlow 2.2 かそれ以後を必要とします。

import tensorflow as tf
from tensorflow import keras

 

最初の単純なサンプル

単純なサンプルから始めましょう :

  • keras.Model をサブクラス化する新しいクラスを作成します。
  • メソッド train_step(self, data) を単に override します。
  • (損失を含む) メトリック名をそれらの現在の値にマップする辞書を返します。

入力引数 data は訓練データとして fit に渡されるものです :

  • fit(x, y, …) を呼び出すことにより、Numpy 配列を渡す場合には、データはタプル (x, y) です。
  • fit(dataset, …) を呼び出すことにより、tf.data.Dataset を渡す場合には、データは各バッチでデータセットにより yield されるものになります。

train_step メソッドの本体で、既に馴染みがあるものに類似した通常の訓練更新を実装します。重要なことは、self.compiled_loss を通して損失を計算する ことです、これは compile() に渡された損失関数をラップします。

同様に、compile() に渡されたメトリクスの状態を更新するために self.compiled_metrics.update_state(y, y_pred) を呼び出し、そしてそれらの現在の値を取得するために最後に self.metrics からの結果を問い合わせます。

class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

Let’s try this out:

import numpy as np

# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3
32/32 [==============================] - 1s 1ms/step - loss: 0.4041 - mae: 0.5120
Epoch 2/3
32/32 [==============================] - 0s 958us/step - loss: 0.2610 - mae: 0.4111
Epoch 3/3
32/32 [==============================] - 0s 914us/step - loss: 0.2933 - mae: 0.4311
<tensorflow.python.keras.callbacks.History at 0x7f1e109407b8>

 

低位に進む

自然に、compile() で損失関数を渡すことを単にスキップし、そして代わりに train_step で手動で総てを行なうことができるでしょう。メトリクスのためのようにです。

ここに低位サンプルがあります、それは optimizer を configure するために compile() を使用するだけです :

  • 損失と MAE スコアを追跡するために Metric インスタンスを作成することから始めます。
  • (update_state() を呼び出して) これらのメトリクスの状態を更新するカスタム train_step() を実装してから、進捗バーにより表示されて任意のコールバックに渡される (result() を通して) 現在の平均値を返すようにそれらに問い合わせます。
  • 各エポック間でメトリクス上 reset_states() を呼び出す必要があることに注意してください!そうでなければ result() は訓練の開始からの平均を返すでしょう、その一方で通常はエポック毎平均で作業します。ありがたいことに、フレームワークはそれを私達のために行なうことができます : モデルの metrics プロパティでリセットすることを望む任意のメトリックを単にリストします。モデルは、各 fit() エポックの最初か evaluate() への呼び出しの最初にここでリストされた任意のオブジェクト上で reset_states を呼び出します。
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")


class CustomModel(keras.Model):
    def train_step(self, data):
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute our own loss
            loss = keras.losses.mean_squared_error(y, y_pred)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Compute our own metrics
        loss_tracker.update_state(loss)
        mae_metric.update_state(y, y_pred)
        return {"loss": loss_tracker.result(), "mae": mae_metric.result()}

    @property
    def metrics(self):
        # We list our `Metric` objects here so that `reset_states()` can be
        # called automatically at the start of each epoch
        # or at the start of `evaluate()`.
        # If you don't implement this property, you have to call
        # `reset_states()` yourself at the time of your choosing.
        return [loss_tracker, mae_metric]


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)

# We don't passs a loss or metrics here.
model.compile(optimizer="adam")

# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5
32/32 [==============================] - 0s 992us/step - loss: 0.3279 - mae: 0.4593
Epoch 2/5
32/32 [==============================] - 0s 876us/step - loss: 0.2278 - mae: 0.3866
Epoch 3/5
32/32 [==============================] - 0s 899us/step - loss: 0.2207 - mae: 0.3808
Epoch 4/5
32/32 [==============================] - 0s 969us/step - loss: 0.2131 - mae: 0.3746
Epoch 5/5
32/32 [==============================] - 0s 978us/step - loss: 0.2058 - mae: 0.3681

 

sample_weight & class_weight をサポートする

最初の基本的なサンプルはサンプル重み付けにどのような言及もしていないことに気付いたかもしれません。fit() 引数 sample_weight と class_weight をサポートすることを望む場合、単純に以下を行なうでしょう :

  • data 引数から sample_weight をアンパックする
  • それを compiled_loss & compiled_metrics に渡す (もちろん、損失 & メトリクスのために compile() に依拠しない場合、単にそれを手動で適用することもできるでしょう)
  • That’s it. That’s the list.
class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        if len(data) == 3:
            x, y, sample_weight = data
        else:
            x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value.
            # The loss function is configured in `compile()`.
            loss = self.compiled_loss(
                y,
                y_pred,
                sample_weight=sample_weight,
                regularization_losses=self.losses,
            )

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics.
        # Metrics are configured in `compile()`.
        self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)

        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3
32/32 [==============================] - 0s 1ms/step - loss: 1.1263 - mae: 1.3582
Epoch 2/3
32/32 [==============================] - 0s 973us/step - loss: 0.5116 - mae: 0.8744
Epoch 3/3
32/32 [==============================] - 0s 902us/step - loss: 0.2700 - mae: 0.5867
<tensorflow.python.keras.callbacks.History at 0x7f1e0c8ce3c8>

 

貴方自身の評価ステップを提供する

model.evaluate() への呼び出しのために同じことを行なうことを望むとすればどうでしょう?それならば正確に同じ方法で test_step を override するでしょう。ここにそれがどのように見えるかがあります :

class CustomModel(keras.Model):
    def test_step(self, data):
        # Unpack the data
        x, y = data
        # Compute predictions
        y_pred = self(x, training=False)
        # Updates the metrics tracking the loss
        self.compiled_loss(y, y_pred, regularization_losses=self.losses)
        # Update the metrics.
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])

# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 857us/step - loss: 0.4283 - mae: 0.5305
[0.41645824909210205, 0.5266892313957214]

 

仕上げる : end-to-end GAN サンプル

貴方が丁度学習した総てを活用する end-to-end サンプルをウォークスルーしましょう。

以下を考えましょう :

  • 28x28x1 画像を生成することを意図した generator ネットワーク。
  • 28x28x1 画像を 2 つのクラス (“fake” と “real”) に分類することを意図した discriminator ネットワーク。
  • 各々に 1 つの optimizer。
  • discriminator を訓練するための損失関数。
from tensorflow.keras import layers

# Create the discriminator
discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)

# Create the generator
latent_dim = 128
generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

ここに機能完全な GAN クラスがあります、それ自身のシグネチャを使用するために compile() を override し、そして train_step の 17 行で GAN アルゴリズム全体を実装しています :

class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn

    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        # Add random noise to the labels - important trick!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Assemble labels that say "all real images"
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
        return {"d_loss": d_loss, "g_loss": g_loss}

それを試運転しましょう :

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)

# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
100/100 [==============================] - 9s 24ms/step - d_loss: 0.4919 - g_loss: 0.8347
<tensorflow.python.keras.callbacks.History at 0x7f8b38026780>

The ideas behind deep learning are simple, so why should their implementation be painful?

 

以上



AI導入支援 #2 ウェビナー

スモールスタートを可能としたAI導入支援   Vol.2
[無料 WEB セミナー] [詳細]
「画像認識 AI PoC スターターパック」の紹介
既に AI 技術を実ビジネスで活用し、成果を上げている日本企業も多く存在しており、競争優位なビジネスを展開しております。
しかしながら AI を導入したくとも PoC (概念実証) だけでも高額な費用がかかり取組めていない企業も少なくないようです。A I導入時には欠かせない PoC を手軽にしかも短期間で認知度を確認可能とするサービの紹介と共に、AI 技術の特性と具体的な導入プロセスに加え運用時のポイントについても解説いたします。
日時:2021年10月13日(水)
会場:WEBセミナー
共催:クラスキャット、日本FLOW(株)
後援:働き方改革推進コンソーシアム
参加費: 無料 (事前登録制)
人工知能開発支援
◆ クラスキャットは 人工知能研究開発支援 サービスを提供しています :
  • テクニカルコンサルティングサービス
  • 実証実験 (プロトタイプ構築)
  • アプリケーションへの実装
  • 人工知能研修サービス
◆ お問合せ先 ◆
(株)クラスキャット
セールス・インフォメーション
E-Mail:sales-info@classcat.com