TensorFlow 2.0 Alpha : ガイド : Keras : TensorFlow Keras で層とモデルを書く (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 03/18/2019
* 本ページは、TensorFlow の本家サイトの TF 2.0 Alpha の以下のページを翻訳した上で適宜、補足説明したものです:
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。
ガイド : Keras : TensorFlow Keras で層とモデルを書く
セットアップ
from __future__ import absolute_import, division, print_function !pip install -q tensorflow==2.0.0-alpha0 import tensorflow as tf tf.keras.backend.clear_session() # For easy reset of notebook state.
Layer クラス
層は状態 (重み) と幾つかの計算をカプセル化します
貴方が作業する主要なデータ構造は Layer です。層は状態 (層の「重み」) と入力から出力への変換 (“call”、層の forward パス) をカプセル化します。
ここに密結合の層があります。それは状態: 変数 w と b を持ちます。
from tensorflow.keras import layers
class Linear(layers.Layer):
def __init__(self, units=32, input_dim=32):
super(Linear, self).__init__()
w_init = tf.random_normal_initializer()
self.w = tf.Variable(initial_value=w_init(shape=(input_dim, units),
dtype='float32'),
trainable=True)
b_init = tf.zeros_initializer()
self.b = tf.Variable(initial_value=b_init(shape=(units,),
dtype='float32'),
trainable=True)
def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b
x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
tf.Tensor( [[ 0.02647757 0.02421902 -0.22342986 -0.08417849] [ 0.02647757 0.02421902 -0.22342986 -0.08417849]], shape=(2, 4), dtype=float32)
重み w と b は層の属性として設定されて自動的に追跡されます :
assert linear_layer.weights == [linear_layer.w, linear_layer.b]
重みを層に追加するためのより簡単なショートカットへのアクセスも持つことに注意してください : add_weight メソッドです :
class Linear(layers.Layer):
def __init__(self, units=32, input_dim=32):
super(Linear, self).__init__()
self.w = self.add_weight(shape=(input_dim, units),
initializer='random_normal',
trainable=True)
self.b = self.add_weight(shape=(units,),
initializer='zeros',
trainable=True)
def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b
x = tf.ones((2, 2))
linear_layer = Linear(4, 2)
y = linear_layer(x)
print(y)
tf.Tensor( [[-0.00899224 -0.22110203 0.00576357 -0.14656523] [-0.00899224 -0.22110203 0.00576357 -0.14656523]], shape=(2, 4), dtype=float32)
層は非訓練可能な重みを持つことができます
訓練可能な重みの他に、層に非訓練可能な重みを追加することもできます。そのような重みは、層を訓練しているとき、逆伝播の間は考慮されません。
ここに非訓練可能な重みをどのように追加して使用するかがあります :
class ComputeSum(layers.Layer):
def __init__(self, input_dim):
super(ComputeSum, self).__init__()
self.total = tf.Variable(initial_value=tf.zeros((input_dim,)),
trainable=False)
def call(self, inputs):
self.total.assign_add(tf.reduce_sum(inputs, axis=0))
return self.total
x = tf.ones((2, 2))
my_sum = ComputeSum(2)
y = my_sum(x)
print(y.numpy())
y = my_sum(x)
print(y.numpy())
[2. 2.] [4. 4.]
それは layer.weights の一部ですが、それは非訓練可能な重みとして分類されます :
print('weights:', len(my_sum.weights))
print('non-trainable weights:', len(my_sum.non_trainable_weights))
# It's not included in the trainable weights:
print('trainable_weights:', my_sum.trainable_weights)
weights: 1 non-trainable weights: 1 trainable_weights: []
ベストプラクティス: 入力の shape が知れるまで重み作成を遅延する
上のロジスティック回帰のサンプルでは、私達の Linear 層は __init__ で重み w と b の shape を計算するために使用された input_dim 引数を取りました :
class Linear(layers.Layer):
def __init__(self, units=32, input_dim=32):
super(Linear, self).__init__()
self.w = self.add_weight(shape=(input_dim, units),
initializer='random_normal',
trainable=True)
self.b = self.add_weight(shape=(units,),
initializer='random_normal',
trainable=True)
多くの場合、貴方は入力のサイズを前もって知らないかもしれません、そして重みをその値が知れるときに遅れて作成したいでしょう、時に層をインスタンス化した後に。
Keras API では、層重みを貴方の層の build(inputs_shape) メソッドで作成することを推奨します。このようにです :
class Linear(layers.Layer):
def __init__(self, units=32):
super(Linear, self).__init__()
self.units = units
def build(self, input_shape):
self.w = self.add_weight(shape=(input_shape[-1], self.units),
initializer='random_normal',
trainable=True)
self.b = self.add_weight(shape=(self.units,),
initializer='random_normal',
trainable=True)
def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b
貴方の層の __call__ メソッドは最初にそれが呼び出されたときに build を自動的に実行します。今貴方は lazy で簡単に利用できる層を持ちます :
linear_layer = Linear(32) # At instantiation, we don't know on what inputs this is going to get called y = linear_layer(x) # The layer's weights are created dynamically the first time the layer is called
層は再帰的に構成可能です
Layer インスタンスを他の層の属性として割り当てる場合、他の層は内側の層の重みを追跡し始めるでしょう。
そのような副層 (= sublayer) は __init__ メソッドで作成することを勧めます (何故ならば副層は典型的には build メソッドを持つので、それらは外側の層が構築されるときに構築されるからです)。
# Let's assume we are reusing the Linear class
# with a `build` method that we defined above.
class MLPBlock(layers.Layer):
def __init__(self):
super(MLPBlock, self).__init__()
self.linear_1 = Linear(32)
self.linear_2 = Linear(32)
self.linear_3 = Linear(1)
def call(self, inputs):
x = self.linear_1(inputs)
x = tf.nn.relu(x)
x = self.linear_2(x)
x = tf.nn.relu(x)
return self.linear_3(x)
mlp = MLPBlock()
y = mlp(tf.ones(shape=(3, 64))) # The first call to the `mlp` will create the weights
print('weights:', len(mlp.weights))
print('trainable weights:', len(mlp.trainable_weights))
weights: 6 trainable weights: 6
層は forward パスの間に作成された損失を再帰的に集めます
層の call メソッドを書くとき、貴方の訓練ループを書くときに、後で使用することを望む損失 tensor を作成できます。これは self.add_loss(value) を呼び出すことで行なうことができます :
# A layer that creates an activity regularization loss
class ActivityRegularizationLayer(layers.Layer):
def __init__(self, rate=1e-2):
super(ActivityRegularizationLayer, self).__init__()
self.rate = rate
def call(self, inputs):
self.add_loss(self.rate * tf.reduce_sum(inputs))
return inputs
(任意の内側の層で作成それたものを含む) これらの損失は layer.losses を通して取得できます。このプロパティは top-level 層への総ての __call__ の開始でリセットされますので、layer.losses は最後の forward パスの間に作成された損失値を常に含みます。
class OuterLayer(layers.Layer):
def __init__(self):
super(OuterLayer, self).__init__()
self.activity_reg = ActivityRegularizationLayer(1e-2)
def call(self, inputs):
return self.activity_reg(inputs)
layer = OuterLayer()
assert len(layer.losses) == 0 # No losses yet since the layer has never been called
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1 # We created one loss value
# `layer.losses` gets reset at the start of each __call__
_ = layer(tf.zeros(1, 1))
assert len(layer.losses) == 1 # This is the loss created during the call above
加えて、loss プロパティはまた任意の内側の層の重みのために作成された正則化損失を含みます :
class OuterLayer(layers.Layer):
def __init__(self):
super(OuterLayer, self).__init__()
self.dense = layers.Dense(32, kernel_regularizer=tf.keras.regularizers.l2(1e-3))
def call(self, inputs):
return self.dense(inputs)
layer = OuterLayer()
_ = layer(tf.zeros((1, 1)))
# This is `1e-3 * sum(layer.dense.kernel)`,
# created by the `kernel_regularizer` above.
print(layer.losses)
[<tf.Tensor: id=249, shape=(), dtype=float32, numpy=0.002372378>]
これらの損失は訓練ループを書くときに考慮されます、このようにです :
# Instantiate an optimizer.
optimizer = tf.keras.optimizers.SGD(learning_rate=1e-3)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# Iterate over the batches of a dataset.
for x_batch_train, y_batch_train in train_dataset:
with tf.GradientTape() as tape:
logits = layer(x_batch_train) # Logits for this minibatch
# Loss value for this minibatch
loss_value = loss_fn(y_batch_train, logits))
# Add extra losses created during this forward pass:
loss_value += sum(model.losses)
grads = tape.gradient(loss_value, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
訓練ループを書くための詳細なガイドについては、訓練 & 評価 へのガイド を見てください。
オプションで貴方の層上でシリアライゼーションを有効にできます
貴方のカスタム層が Functional モデル の一部としてシリアライズ可能であることが必要ならば、オプションで get_config メソッドを実装できます :
class Linear(layers.Layer):
def __init__(self, units=32):
super(Linear, self).__init__()
self.units = units
def build(self, input_shape):
self.w = self.add_weight(shape=(input_shape[-1], self.units),
initializer='random_normal',
trainable=True)
self.b = self.add_weight(shape=(self.units,),
initializer='random_normal',
trainable=True)
def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b
def get_config(self):
return {'units': self.units}
# Now you can recreate the layer from its config:
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'units': 64}
基底 Layer クラスの __init__ メソッドは幾つかのキーワード引数、特に名前と dtype を取ることに注意してください。これらの引数を __init__ で親クラス渡してそれらを層 config に含めることは良い実践です :
class Linear(layers.Layer):
def __init__(self, units=32, **kwargs):
super(Linear, self).__init__(**kwargs)
self.units = units
def build(self, input_shape):
self.w = self.add_weight(shape=(input_shape[-1], self.units),
initializer='random_normal',
trainable=True)
self.b = self.add_weight(shape=(self.units,),
initializer='random_normal',
trainable=True)
def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b
def get_config(self):
config = super(Linear, self).get_config()
config.update({'units': self.units})
return config
layer = Linear(64)
config = layer.get_config()
print(config)
new_layer = Linear.from_config(config)
{'trainable': True, 'dtype': None, 'name': 'linear_8', 'units': 64}
層をその config からデシリアライズするときにより柔軟性を必要とする場合には、from_config クラス・メソッドをオーバーライドすることもできます。これは from_config の基底実装です :
def from_config(cls, config): return cls(**config)
シリアライゼーションとセービングについて更に学習するためには、完全な Guide to Saving and Serializing Models を見てください。
call メソッドの特権 training 引数
幾つかの層、特にBatchNormalization 層と Dropout 層は訓練と推論の間に異なる挙動を持ちます。そのような層のために、training (ブーリアン) 引数を晒すのは標準的な実践です。
call でこの引数を晒すことで、組み込み訓練と評価ループ (e.g. fit) に訓練と推論で層を正しく使用することを可能にします。
class CustomDropout(layers.Layer):
def __init__(self, rate, **kwargs):
super(CustomDropout, self).__init__(**kwargs)
self.rate = rate
def call(self, inputs, training=None):
return tf.cond(training,
lambda: tf.nn.dropout(inputs, rate=self.rate),
lambda: inputs)
モデルを構築する
Model クラス
一般に、内側の計算ブロックを定義するために Layer クラスを使用し、そして外側のモデル — 貴方が訓練するオブジェクト — を定義するために Model クラスを定義します。
例えば、ResNet 50 モデルで、幾つかの ResNet ブロックのサブクラス化された層と、ResNet 50 ネットワーク全体を取り囲む単一のモデルを持つでしょう。
Model クラスは Layer と同じ API を持ちますが、次の違いがあります :- それは組み込み訓練、評価と予測ループを公開します (model.fit(), model.evaluate(), model.predict())。- それはその内側の層のリストを model.layers で公開します。- それはセービングとシリアライゼーション API を公開します。
実際上、”Layer” クラスは著述では (「畳込み層」や「リカレント層」内の) 「層 (= layer)」や (「ResNet ブロック」や「Inception ブロック」内の) 「ブロック”」として言及するものに対応しています。
一方で、”Model” クラスは著述で (「深層学習モデル」内の)「モデル」や (「深層ニューラルネットワーク」内の)「ネットワーク」として言及されるもに対応しています。
例えば、上の mini-resnet サンプルを取り、それを (fit() で訓練可能で save_weights でセーブできるような) Model を構築するために使用できるでしょう :
class ResNet(tf.keras.Model):
def __init__(self):
super(ResNet, self).__init__()
self.block_1 = ResNetBlock()
self.block_2 = ResNetBlock()
self.global_pool = layers.GlobalAveragePooling2D()
self.classifier = Dense(num_classes)
def call(self, inputs):
x = self.block_1(inputs)
x = self.block_2(x)
x = self.global_pool(x)
return self.classifier(x)
resnet = ResNet()
dataset = ...
resnet.fit(dataset, epochs=10)
resnet.save_weights(filepath)
総てを一つにまとめる : end-to-end サンプル
ここに貴方がここまでに学習したものがありあす :- 層は (__init__ か build で作成された) 状態と (call での) ある計算をカプセル化します。- 層は新しい、より大きな計算ブロックを作成するために再帰的にネストできます。- 層は損失を作成して追跡できます (典型的には正則化損失)。- 外側のコンテナ、貴方が訓練したいものはモデルです。モデルはちょうど層のようなものですが、追加の訓練とシリアライゼーション・ユティリティを持ちます。
これら総てのものを end-to-end サンプルにまとめましょう : 変分オートエンコーダ (VAE) を実装していきます。それを MNIST 数字の上で訓練します。
私達の VAE は Model のサブクラスで、 Layer をサブクラス化した層のネストされた構成として構築されます。それは正則化損失 (KL ダイバージェンス) をフィーチャーします。
class Sampling(layers.Layer):
"""Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""
def call(self, inputs):
z_mean, z_log_var = inputs
batch = tf.shape(z_mean)[0]
dim = tf.shape(z_mean)[1]
epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
return z_mean + tf.exp(0.5 * z_log_var) * epsilon
class Encoder(layers.Layer):
"""Maps MNIST digits to a triplet (z_mean, z_log_var, z)."""
def __init__(self,
latent_dim=32,
intermediate_dim=64,
name='encoder',
**kwargs):
super(Encoder, self).__init__(name=name, **kwargs)
self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
self.dense_mean = layers.Dense(latent_dim)
self.dense_log_var = layers.Dense(latent_dim)
self.sampling = Sampling()
def call(self, inputs):
x = self.dense_proj(inputs)
z_mean = self.dense_mean(x)
z_log_var = self.dense_log_var(x)
z = self.sampling((z_mean, z_log_var))
return z_mean, z_log_var, z
class Decoder(layers.Layer):
"""Converts z, the encoded digit vector, back into a readable digit."""
def __init__(self,
original_dim,
intermediate_dim=64,
name='decoder',
**kwargs):
super(Decoder, self).__init__(name=name, **kwargs)
self.dense_proj = layers.Dense(intermediate_dim, activation='relu')
self.dense_output = layers.Dense(original_dim, activation='sigmoid')
def call(self, inputs):
x = self.dense_proj(inputs)
return self.dense_output(x)
class VariationalAutoEncoder(tf.keras.Model):
"""Combines the encoder and decoder into an end-to-end model for training."""
def __init__(self,
original_dim,
intermediate_dim=64,
latent_dim=32,
name='autoencoder',
**kwargs):
super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
self.original_dim = original_dim
self.encoder = Encoder(latent_dim=latent_dim,
intermediate_dim=intermediate_dim)
self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)
def call(self, inputs):
z_mean, z_log_var, z = self.encoder(inputs)
reconstructed = self.decoder(z)
# Add KL divergence regularization loss.
kl_loss = - 0.5 * tf.reduce_sum(
z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
self.add_loss(kl_loss)
return reconstructed
original_dim = 784
vae = VariationalAutoEncoder(original_dim, 64, 32)
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
mse_loss_fn = tf.keras.losses.MeanSquaredError()
loss_metric = tf.keras.metrics.Mean()
(x_train, _), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train.reshape(60000, 784).astype('float32') / 255
train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)
# Iterate over epochs.
for epoch in range(3):
print('Start of epoch %d' % (epoch,))
# Iterate over the batches of the dataset.
for step, x_batch_train in enumerate(train_dataset):
with tf.GradientTape() as tape:
reconstructed = vae(x_batch_train)
# Compute reconstruction loss
loss = mse_loss_fn(x_batch_train, reconstructed)
loss += sum(vae.losses) # Add KLD regularization loss
grads = tape.gradient(loss, vae.trainable_variables)
optimizer.apply_gradients(zip(grads, vae.trainable_variables))
loss_metric(loss)
if step % 100 == 0:
print('step %s: mean loss = %s' % (step, loss_metric.result()))
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11493376/11490434 [==============================] - 0s 0us/step Start of epoch 0 step 0: mean loss = tf.Tensor(161.00046, shape=(), dtype=float32) step 100: mean loss = tf.Tensor(5.378642, shape=(), dtype=float32) step 200: mean loss = tf.Tensor(2.7485619, shape=(), dtype=float32) step 300: mean loss = tf.Tensor(1.8636498, shape=(), dtype=float32) step 400: mean loss = tf.Tensor(1.4188905, shape=(), dtype=float32) step 500: mean loss = tf.Tensor(1.1506605, shape=(), dtype=float32) step 600: mean loss = tf.Tensor(0.9712579, shape=(), dtype=float32) step 700: mean loss = tf.Tensor(0.84277666, shape=(), dtype=float32) step 800: mean loss = tf.Tensor(0.7465494, shape=(), dtype=float32) step 900: mean loss = tf.Tensor(0.6713111, shape=(), dtype=float32) Start of epoch 1 step 0: mean loss = tf.Tensor(0.6469352, shape=(), dtype=float32) step 100: mean loss = tf.Tensor(0.59141695, shape=(), dtype=float32) step 200: mean loss = tf.Tensor(0.54561764, shape=(), dtype=float32) step 300: mean loss = tf.Tensor(0.50713503, shape=(), dtype=float32) step 400: mean loss = tf.Tensor(0.4744861, shape=(), dtype=float32) step 500: mean loss = tf.Tensor(0.44623637, shape=(), dtype=float32) step 600: mean loss = tf.Tensor(0.4216811, shape=(), dtype=float32) step 700: mean loss = tf.Tensor(0.4000843, shape=(), dtype=float32) step 800: mean loss = tf.Tensor(0.38100395, shape=(), dtype=float32) step 900: mean loss = tf.Tensor(0.3639242, shape=(), dtype=float32) Start of epoch 2 step 0: mean loss = tf.Tensor(0.35792458, shape=(), dtype=float32) step 100: mean loss = tf.Tensor(0.34327272, shape=(), dtype=float32) step 200: mean loss = tf.Tensor(0.33004242, shape=(), dtype=float32) step 300: mean loss = tf.Tensor(0.31798688, shape=(), dtype=float32) step 400: mean loss = tf.Tensor(0.3070505, shape=(), dtype=float32) step 500: mean loss = tf.Tensor(0.29695606, shape=(), dtype=float32) step 600: mean loss = tf.Tensor(0.28770587, shape=(), dtype=float32) step 700: mean loss = tf.Tensor(0.2791505, shape=(), dtype=float32) step 800: mean loss = tf.Tensor(0.27125034, shape=(), dtype=float32) step 900: mean loss = tf.Tensor(0.26388252, shape=(), dtype=float32)
VAE は Model をサブクラス化していますから、それは組み込み訓練ループをフィーチャーします。従ってそれをこのように訓練することもできるでしょう :
vae = VariationalAutoEncoder(784, 64, 32) optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3) vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError()) vae.fit(x_train, x_train, epochs=3, batch_size=64)
Epoch 1/3 60000/60000 [==============================] - 4s 67us/sample - loss: 0.7372 Epoch 2/3 60000/60000 [==============================] - 4s 64us/sample - loss: 0.0686 Epoch 3/3 60000/60000 [==============================] - 4s 65us/sample - loss: 0.0678 <tensorflow.python.keras.callbacks.History at 0x7f94e85c0b70>
オブジェクト指向開発を越えて : Functional API
このサンプルは貴方にとってオブジェクト指向開発に過ぎたでしょうか?Functional API を使用してモデルを構築することもできます。重要なことは、一つのスタイルか別のものかの選択は他のスタイルで書かれたコンポーネントを活用することを妨げないことです : 貴方は常に上手く組み合わせることができます。
例えば、下の Functional API サンプルは上の例で定義した同じ Sampling 層を再利用しています。
original_dim = 784
intermediate_dim = 64
latent_dim = 32
# Define encoder model.
original_inputs = tf.keras.Input(shape=(original_dim,), name='encoder_input')
x = layers.Dense(intermediate_dim, activation='relu')(original_inputs)
z_mean = layers.Dense(latent_dim, name='z_mean')(x)
z_log_var = layers.Dense(latent_dim, name='z_log_var')(x)
z = Sampling()((z_mean, z_log_var))
encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name='encoder')
# Define decoder model.
latent_inputs = tf.keras.Input(shape=(latent_dim,), name='z_sampling')
x = layers.Dense(intermediate_dim, activation='relu')(latent_inputs)
outputs = layers.Dense(original_dim, activation='sigmoid')(x)
decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name='decoder')
# Define VAE model.
outputs = decoder(z)
vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name='vae')
# Add KL divergence regularization loss.
kl_loss = - 0.5 * tf.reduce_sum(
z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
vae.add_loss(kl_loss)
# Train.
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
vae.fit(x_train, x_train, epochs=3, batch_size=64)
Epoch 1/3 60000/60000 [==============================] - 4s 67us/sample - loss: 0.9678 Epoch 2/3 60000/60000 [==============================] - 4s 64us/sample - loss: 0.0693 Epoch 3/3 60000/60000 [==============================] - 4s 66us/sample - loss: 0.0679 <tensorflow.python.keras.callbacks.History at 0x7f94d413d550>
以上