TensorFlow : Tutorials : 画像 : Pix2Pix (Conditional GAN) (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 09/06/2018
* TensorFlow 1.10 で更に改訂されています。
* TensorFlow 1.9 でドキュメント構成が変更され、数篇が新規に追加されましたので再翻訳しました。
* 本ページは、TensorFlow の本家サイトの Tutorials – Images の以下のページを翻訳した上で
適宜、補足説明したものです:
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。

- お住まいの地域に関係なく Web ブラウザからご参加頂けます。事前登録 が必要ですのでご注意ください。
- Windows PC のブラウザからご参加が可能です。スマートデバイスもご利用可能です。
◆ お問合せ : 本件に関するお問い合わせ先は下記までお願いいたします。
| 株式会社クラスキャット セールス・マーケティング本部 セールス・インフォメーション |
| E-Mail:sales-info@classcat.com ; WebSite: https://www.classcat.com/ |
| Facebook: https://www.facebook.com/ClassCatJP/ |
Pix2Pix : tf.keras と eager のサンプル
このノートブックは Image-to-Image Translation with Conditional Adversarial Networks で記述されている、conditional GAN を使用して画像から画像への変換を示します。このテクニックを使用して白黒写真を彩色したり、google マップを google earth に変換したりする等のことができます。ここでは、建物の正面 (= facade) を real な建物に変換します。これを成すために tf.keras と eager execution を使用します。
サンプルでは、CMP Facade データベースを使用します、これはプラハの Czech Technical University の Center for Machine Perception により役立つように提供されています。サンプルを短く保持するために、上のペーパーの著者により作成された、このデータセットの前処理された コピー を使用します。
各エポックは単一の P100 上で 58 秒前後かかります。
下は 200 エポックの間モデルを訓練した後に生成された出力です。
sample output_1

TensorFlow をインポートして eager execution を有効にする
# Import TensorFlow >= 1.10 and enable eager execution import tensorflow as tf tf.enable_eager_execution() import os import time import numpy as np import matplotlib.pyplot as plt import PIL from IPython.display import clear_output
データセットをロードする
データセットと類似のデータセットを ここ からダウンロードできます。
(上の) ペーパーで言及されているように、訓練データセットにランダムに jittering とミラーリングを適用します。
- ランダム jittering では、画像は 286 x 286 にリサイズされてからランダムに 256 x 256 にクロップされます。
- ランダム・ミラーリングでは、画像は水平に i.e. 左から右にランダムにフリップ (反転) されます。
path_to_zip = tf.keras.utils.get_file('facades.tar.gz',
cache_subdir=os.path.abspath('.'),
origin='https://people.eecs.berkeley.edu/~tinghuiz/projects/pix2pix/datasets/facades.tar.gz',
extract=True)
PATH = os.path.join(os.path.dirname(path_to_zip), 'facades/')
BUFFER_SIZE = 400 BATCH_SIZE = 1 IMG_WIDTH = 256 IMG_HEIGHT = 256
def load_image(image_file, is_train):
image = tf.read_file(image_file)
image = tf.image.decode_jpeg(image)
w = tf.shape(image)[1]
w = w // 2
real_image = image[:, :w, :]
input_image = image[:, w:, :]
input_image = tf.cast(input_image, tf.float32)
real_image = tf.cast(real_image, tf.float32)
if is_train:
# random jittering
# resizing to 286 x 286 x 3
input_image = tf.image.resize_images(input_image, [286, 286],
align_corners=True,
method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
real_image = tf.image.resize_images(real_image, [286, 286],
align_corners=True,
method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
# randomly cropping to 256 x 256 x 3
stacked_image = tf.stack([input_image, real_image], axis=0)
cropped_image = tf.random_crop(stacked_image, size=[2, IMG_HEIGHT, IMG_WIDTH, 3])
input_image, real_image = cropped_image[0], cropped_image[1]
if np.random.random() > 0.5:
# random mirroring
input_image = tf.image.flip_left_right(input_image)
real_image = tf.image.flip_left_right(real_image)
else:
input_image = tf.image.resize_images(input_image, size=[IMG_HEIGHT, IMG_WIDTH],
align_corners=True, method=2)
real_image = tf.image.resize_images(real_image, size=[IMG_HEIGHT, IMG_WIDTH],
align_corners=True, method=2)
# normalizing the images to [-1, 1]
input_image = (input_image / 127.5) - 1
real_image = (real_image / 127.5) - 1
return input_image, real_image
バッチを作成し、データセットをマップして (前処理を行なって) シャッフルするために tf.data を使用する
train_dataset = tf.data.Dataset.list_files(PATH+'train/*.jpg') train_dataset = train_dataset.shuffle(BUFFER_SIZE) train_dataset = train_dataset.map(lambda x: load_image(x, True)) train_dataset = train_dataset.batch(1)
test_dataset = tf.data.Dataset.list_files(PATH+'test/*.jpg') test_dataset = test_dataset.map(lambda x: load_image(x, False)) test_dataset = test_dataset.batch(1)
generator と discriminator モデルを書く
- Generator
- gvenerator のアーキテクチャは変更された U-Net です。
- エンコーダの各ブロックは (Conv -> Batchnorm -> Leaky ReLU)
- デコーダの各ブロックは (Transposed Conv -> Batchnorm -> Dropout(applied to the first 3 blocks) -> ReLU)
- エンコーダとデコーダの間に (U-Net 内のように) スキップ・コネクションがあります。
- Discriminator
- discriminator は PatchGAN です。
- discriminator の各ブロックは (Conv -> BatchNorm -> Leaky ReLU) です。
- 最後の層の後の出力の shape は (batch_size, 30, 30, 1) です。
- 出力の各 30×30 パッチが入力画像の 70×70 の断片を分類します (そのようなアーキテクチャは PatchGAN と呼ばれます)。
- discriminator は 2 つの入力を受け取ります。
- 入力画像とターゲット画像、これは real としてえ分類されるべきです。
- 入力画像と生成画像 (generator の出力)、これは fake として分類されるべきです。
- これらの 2 つの入力をコード (tf.concat([inp, tar], axis=-1)) で一緒に結合します。
- generator と discriminator を通して進む入力の shape はコードのコメントにあります。
アーキテクチャとハイパーパラメータについて更に学習するためには、ペーパー を参照できます。
OUTPUT_CHANNELS = 3
class Downsample(tf.keras.Model):
def __init__(self, filters, size, apply_batchnorm=True):
super(Downsample, self).__init__()
self.apply_batchnorm = apply_batchnorm
initializer = tf.random_normal_initializer(0., 0.02)
self.conv1 = tf.keras.layers.Conv2D(filters,
(size, size),
strides=2,
padding='same',
kernel_initializer=initializer,
use_bias=False)
if self.apply_batchnorm:
self.batchnorm = tf.keras.layers.BatchNormalization()
def call(self, x, training):
x = self.conv1(x)
if self.apply_batchnorm:
x = self.batchnorm(x, training=training)
x = tf.nn.leaky_relu(x)
return x
class Upsample(tf.keras.Model):
def __init__(self, filters, size, apply_dropout=False):
super(Upsample, self).__init__()
self.apply_dropout = apply_dropout
initializer = tf.random_normal_initializer(0., 0.02)
self.up_conv = tf.keras.layers.Conv2DTranspose(filters,
(size, size),
strides=2,
padding='same',
kernel_initializer=initializer,
use_bias=False)
self.batchnorm = tf.keras.layers.BatchNormalization()
if self.apply_dropout:
self.dropout = tf.keras.layers.Dropout(0.5)
def call(self, x1, x2, training):
x = self.up_conv(x1)
x = self.batchnorm(x, training=training)
if self.apply_dropout:
x = self.dropout(x, training=training)
x = tf.nn.relu(x)
x = tf.concat([x, x2], axis=-1)
return x
class Generator(tf.keras.Model):
def __init__(self):
super(Generator, self).__init__()
initializer = tf.random_normal_initializer(0., 0.02)
self.down1 = Downsample(64, 4, apply_batchnorm=False)
self.down2 = Downsample(128, 4)
self.down3 = Downsample(256, 4)
self.down4 = Downsample(512, 4)
self.down5 = Downsample(512, 4)
self.down6 = Downsample(512, 4)
self.down7 = Downsample(512, 4)
self.down8 = Downsample(512, 4)
self.up1 = Upsample(512, 4, apply_dropout=True)
self.up2 = Upsample(512, 4, apply_dropout=True)
self.up3 = Upsample(512, 4, apply_dropout=True)
self.up4 = Upsample(512, 4)
self.up5 = Upsample(256, 4)
self.up6 = Upsample(128, 4)
self.up7 = Upsample(64, 4)
self.last = tf.keras.layers.Conv2DTranspose(OUTPUT_CHANNELS,
(4, 4),
strides=2,
padding='same',
kernel_initializer=initializer)
@tf.contrib.eager.defun
def call(self, x, training):
# x shape == (bs, 256, 256, 3)
x1 = self.down1(x, training=training) # (bs, 128, 128, 64)
x2 = self.down2(x1, training=training) # (bs, 64, 64, 128)
x3 = self.down3(x2, training=training) # (bs, 32, 32, 256)
x4 = self.down4(x3, training=training) # (bs, 16, 16, 512)
x5 = self.down5(x4, training=training) # (bs, 8, 8, 512)
x6 = self.down6(x5, training=training) # (bs, 4, 4, 512)
x7 = self.down7(x6, training=training) # (bs, 2, 2, 512)
x8 = self.down8(x7, training=training) # (bs, 1, 1, 512)
x9 = self.up1(x8, x7, training=training) # (bs, 2, 2, 1024)
x10 = self.up2(x9, x6, training=training) # (bs, 4, 4, 1024)
x11 = self.up3(x10, x5, training=training) # (bs, 8, 8, 1024)
x12 = self.up4(x11, x4, training=training) # (bs, 16, 16, 1024)
x13 = self.up5(x12, x3, training=training) # (bs, 32, 32, 512)
x14 = self.up6(x13, x2, training=training) # (bs, 64, 64, 256)
x15 = self.up7(x14, x1, training=training) # (bs, 128, 128, 128)
x16 = self.last(x15) # (bs, 256, 256, 3)
x16 = tf.nn.tanh(x16)
return x16
class DiscDownsample(tf.keras.Model):
def __init__(self, filters, size, apply_batchnorm=True):
super(DiscDownsample, self).__init__()
self.apply_batchnorm = apply_batchnorm
initializer = tf.random_normal_initializer(0., 0.02)
self.conv1 = tf.keras.layers.Conv2D(filters,
(size, size),
strides=2,
padding='same',
kernel_initializer=initializer,
use_bias=False)
if self.apply_batchnorm:
self.batchnorm = tf.keras.layers.BatchNormalization()
def call(self, x, training):
x = self.conv1(x)
if self.apply_batchnorm:
x = self.batchnorm(x, training=training)
x = tf.nn.leaky_relu(x)
return x
class Discriminator(tf.keras.Model):
def __init__(self):
super(Discriminator, self).__init__()
initializer = tf.random_normal_initializer(0., 0.02)
self.down1 = DiscDownsample(64, 4, False)
self.down2 = DiscDownsample(128, 4)
self.down3 = DiscDownsample(256, 4)
# we are zero padding here with 1 because we need our shape to
# go from (batch_size, 32, 32, 256) to (batch_size, 31, 31, 512)
self.zero_pad1 = tf.keras.layers.ZeroPadding2D()
self.conv = tf.keras.layers.Conv2D(512,
(4, 4),
strides=1,
kernel_initializer=initializer,
use_bias=False)
self.batchnorm1 = tf.keras.layers.BatchNormalization()
# shape change from (batch_size, 31, 31, 512) to (batch_size, 30, 30, 1)
self.zero_pad2 = tf.keras.layers.ZeroPadding2D()
self.last = tf.keras.layers.Conv2D(1,
(4, 4),
strides=1,
kernel_initializer=initializer)
@tf.contrib.eager.defun
def call(self, inp, tar, training):
# concatenating the input and the target
x = tf.concat([inp, tar], axis=-1) # (bs, 256, 256, channels*2)
x = self.down1(x, training=training) # (bs, 128, 128, 64)
x = self.down2(x, training=training) # (bs, 64, 64, 128)
x = self.down3(x, training=training) # (bs, 32, 32, 256)
x = self.zero_pad1(x) # (bs, 34, 34, 256)
x = self.conv(x) # (bs, 31, 31, 512)
x = self.batchnorm1(x, training=training)
x = tf.nn.leaky_relu(x)
x = self.zero_pad2(x) # (bs, 33, 33, 512)
# don't add a sigmoid activation here since
# the loss function expects raw logits.
x = self.last(x) # (bs, 30, 30, 1)
return x
# The call function of Generator and Discriminator have been decorated # with tf.contrib.eager.defun() # We get a performance speedup if defun is used (~25 seconds per epoch) generator = Generator() discriminator = Discriminator()
損失関数と optimizer を定義する
- Discriminator 損失
- discriminator 損失関数は 2 つの入力を取ります; real 画像、生成画像です。
- real_loss は real 画像と 1 の配列 (何故ならばこれらは real 画像だからです) の sigmod 交差エントロピー損失です。
- generated_loss は生成画像とゼロの配列 (何故ならばこれらは fake 画像だからです) の sigmod 交差エントロピー損失です。
- それから total_loss は real_loss と generated_loss の合計です。
- Generator 損失
- それは生成画像と 1 の配列の sigmoid 交差エントロピー損失です。
- ペーパーはまた L1 損失を含みます、これは生成画像とターゲット画像の間の MAE (mean absolute error, 平均絶対誤差) です。
- これは生成画像にターゲット画像に構造的に類似することを可能にします。
- 総計の generator 損失を計算するための式は = gan_loss + LAMBDA * l1_loss, ここで LAMBDA = 100 です。この値はペーパーの著者により決められました。
LAMBDA = 100
def discriminator_loss(disc_real_output, disc_generated_output):
real_loss = tf.losses.sigmoid_cross_entropy(multi_class_labels = tf.ones_like(disc_real_output),
logits = disc_real_output)
generated_loss = tf.losses.sigmoid_cross_entropy(multi_class_labels = tf.zeros_like(disc_generated_output),
logits = disc_generated_output)
total_disc_loss = real_loss + generated_loss
return total_disc_loss
def generator_loss(disc_generated_output, gen_output, target):
gan_loss = tf.losses.sigmoid_cross_entropy(multi_class_labels = tf.ones_like(disc_generated_output),
logits = disc_generated_output)
# mean absolute error
l1_loss = tf.reduce_mean(tf.abs(target - gen_output))
total_gen_loss = gan_loss + (LAMBDA * l1_loss)
return total_gen_loss
generator_optimizer = tf.train.AdamOptimizer(2e-4, beta1=0.5) discriminator_optimizer = tf.train.AdamOptimizer(2e-4, beta1=0.5)
チェックポイント (オブジェクト・ベースのセーブ)
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
discriminator_optimizer=discriminator_optimizer,
generator=generator,
discriminator=discriminator)
訓練
- データセットに渡り反復することから開始します。
- generator は入力画像を取り、そして私達は生成出力を得ます。
- discriminator は最初の入力として input_image と生成画像を受け取ります。2 番目の入力は input_image と the target_image です。
- 次に generator と discriminator 損失を計算します。
- それから、generator と discrimator 変数 (入力) の両者に関する損失の勾配を計算してそれらを optimizer に適用します。
生成画像
- 訓練後、幾つかの画像を生成するときです!
- テスト・データセットから generator に画像を渡します。
- それから generator は入力画像を私達が期待する出力に翻訳します。
- 最後のステップは予測をプロットすることです、そして voila !
EPOCHS = 200
def generate_images(model, test_input, tar):
# the training=True is intentional here since
# we want the batch statistics while running the model
# on the test dataset. If we use training=False, we will get
# the accumulated statistics learned from the training dataset
# (which we don't want)
prediction = model(test_input, training=True)
plt.figure(figsize=(15,15))
display_list = [test_input[0], tar[0], prediction[0]]
title = ['Input Image', 'Ground Truth', 'Predicted Image']
for i in range(3):
plt.subplot(1, 3, i+1)
plt.title(title[i])
# getting the pixel values between [0, 1] to plot it.
plt.imshow(display_list[i] * 0.5 + 0.5)
plt.axis('off')
plt.show()
def train(dataset, epochs):
for epoch in range(epochs):
start = time.time()
for input_image, target in dataset:
with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
gen_output = generator(input_image, training=True)
disc_real_output = discriminator(input_image, target, training=True)
disc_generated_output = discriminator(input_image, gen_output, training=True)
gen_loss = generator_loss(disc_generated_output, gen_output, target)
disc_loss = discriminator_loss(disc_real_output, disc_generated_output)
generator_gradients = gen_tape.gradient(gen_loss,
generator.variables)
discriminator_gradients = disc_tape.gradient(disc_loss,
discriminator.variables)
generator_optimizer.apply_gradients(zip(generator_gradients,
generator.variables))
discriminator_optimizer.apply_gradients(zip(discriminator_gradients,
discriminator.variables))
if epoch % 1 == 0:
clear_output(wait=True)
for inp, tar in test_dataset.take(1):
generate_images(generator, inp, tar)
# saving (checkpoint) the model every 20 epochs
if (epoch + 1) % 20 == 0:
checkpoint.save(file_prefix = checkpoint_prefix)
print ('Time taken for epoch {} is {} sec\n'.format(epoch + 1,
time.time()-start))
train(train_dataset, EPOCHS)
最新のチェックポイントをリストアしてテストする
# restoring the latest checkpoint in checkpoint_dir checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
テスト・データセット全体上のテスト
# Run the trained model on the entire test dataset for inp, tar in test_dataset: generate_images(generator, inp, tar)
以上