TensorFlow : Tutorials : 画像 : tf.keras で画像セグメンテーション (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 09/07/2018
* TensorFlow 1.10 で更に改訂されています。
* TensorFlow 1.9 でドキュメント構成が変更され、数篇が新規に追加されましたので再翻訳しました。
* 本ページは、TensorFlow の本家サイトの Tutorials – Images の以下のページを翻訳した上で
適宜、補足説明したものです:
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。
tf.keras で画像セグメンテーション
このチュートリアルではどのように画像をセグメントするかを学習します。セグメンテーションは各ピクセルにおけるオブジェクトのクラスを可視化して、pixel-wise なセグメンテーションを生成するプロセスです。例えば、画像内の人々の位置と境界を識別したり画像から細胞核を識別したりできます。公式には、画像セグメンテーションは画像を識別することを望むピクセルのセット (私達のターゲット) と背景に分割するプロセスを参照します。
特に、このチュートリアルでは Kaggle Carvana 画像 Masking チャレンジ・データセット を使用しています。
データセットは巨大な数の自動車画像を含み、各自動車は異なる角度から取られています。
更に、各自動車画像について、関連する手動による cutout (切り抜き) マスクを持ちます ; タスクは見ていないデータについてこれらの cutout マスクを自動的に作成することです。
カバーされる特定の概念
プロセスでは、実践的な経験を構築して次の概念まわりの直感を開発します :
- Functional API – 私達は Functional API で UNet を実装します、生物医学の画像セグメンテーションのために古典的に使用された畳込みネットワーク・モデルです。
- このモデルは複数の入力/出力を必要とする層を持ちます。これは functional API の利用を必要とします。
- 元のペーパー、Olaf Ronneberge による U-Net: Convolutional Networks for Biomedical Image Segmentation を調べてください!
- カスタム損失関数とメトリクス – バイナリ交差エントロピーと Dice 損失を使用してカスタム損失関数を実装します。dice 係数 (これは損失のために使用されます) と平均 intersection over union もまた実装します、これは訓練プロセスを監視してどのくらい上手く遂行しているかを判断するに役立ちます。
- keras モデルをセーブしてロードする – 最善なモデルをディスクにセーブします。推論を遂行/モデルを評価することを望むとき、ディスクからモデルをロードします。
一般的なワークフローに従います :
- データを可視化する/何某かの調査的なデータ解析を遂行する
- データ・パイプラインをセットアップして前処理をする
- モデルを構築する
- モデルを訓練する
- モデルを評価する
- 反復
By: Raymond Yuan, Software Engineering Intern
!pip install kaggle
import os import glob import zipfile import functools import numpy as np import matplotlib.pyplot as plt import matplotlib as mpl mpl.rcParams['axes.grid'] = False mpl.rcParams['figure.figsize'] = (12,12) from sklearn.model_selection import train_test_split import matplotlib.image as mpimg import pandas as pd from PIL import Image
import tensorflow as tf import tensorflow.contrib as tfcontrib from tensorflow.python.keras import layers from tensorflow.python.keras import losses from tensorflow.python.keras import models from tensorflow.python.keras import backend as K
総てのファイルを取得する
このチュートリアルは Kaggle からのデータセットを使用していますので、それは貴方の Kaggle アカウントのための API トークンを作成して、それをアップロードすることを必要とします。
import os # Upload the API token. def get_kaggle_credentials(): token_dir = os.path.join(os.path.expanduser("~"),".kaggle") token_file = os.path.join(token_dir, "kaggle.json") if not os.path.isdir(token_dir): os.mkdir(token_dir) try: with open(token_file,'r') as f: pass except IOError as no_file: try: from google.colab import files except ImportError: raise no_file uploaded = files.upload() if "kaggle.json" not in uploaded: raise ValueError("You need an API key! see: " "https://github.com/Kaggle/kaggle-api#api-credentials") with open(token_file, "wb") as f: f.write(uploaded["kaggle.json"]) os.chmod(token_file, 600) get_kaggle_credentials()
credential を追加後に kaggle だけをインポートします。
import kaggle
Kaggle からデータをダウンロードする
警告、これから先に巨大なダウンロード – 総てのファイルのダウンロードはディスク容量の 14GB を必要とします。
competition_name = 'carvana-image-masking-challenge'
# Download data from Kaggle and unzip the files of interest. def load_data_from_zip(competition, file): with zipfile.ZipFile(os.path.join(competition, file), "r") as zip_ref: unzipped_file = zip_ref.namelist()[0] zip_ref.extractall(competition) def get_data(competition): kaggle.api.competition_download_files(competition, competition) load_data_from_zip(competition, 'train.zip') load_data_from_zip(competition, 'train_masks.zip') load_data_from_zip(competition, 'train_masks.csv.zip')
データをダウンロードする前に competition ルールを受諾 しなければなりません。
get_data(competition_name)
mg_dir = os.path.join(competition_name, "train") label_dir = os.path.join(competition_name, "train_masks")
df_train = pd.read_csv(os.path.join(competition_name, 'train_masks.csv')) ids_train = df_train['img'].map(lambda s: s.split('.')[0])
x_train_filenames = [] y_train_filenames = [] for img_id in ids_train: x_train_filenames.append(os.path.join(img_dir, "{}.jpg".format(img_id))) y_train_filenames.append(os.path.join(label_dir, "{}_mask.gif".format(img_id)))
x_train_filenames, x_val_filenames, y_train_filenames, y_val_filenames = \ train_test_split(x_train_filenames, y_train_filenames, test_size=0.2, random_state=42)
num_train_examples = len(x_train_filenames) num_val_examples = len(x_val_filenames) print("Number of training examples: {}".format(num_train_examples)) print("Number of validation examples: {}".format(num_val_examples))
パスがどのように見えるかがここにあります
x_train_filenames[:10]
y_train_filenames[:10]
可視化する
データセットの異なる画像のサンプルの幾つかを見てみましょう。
display_num = 5 r_choices = np.random.choice(num_train_examples, display_num) plt.figure(figsize=(10, 15)) for i in range(0, display_num * 2, 2): img_num = r_choices[i // 2] x_pathname = x_train_filenames[img_num] y_pathname = y_train_filenames[img_num] plt.subplot(display_num, 2, i + 1) plt.imshow(mpimg.imread(x_pathname)) plt.title("Original Image") example_labels = Image.open(y_pathname) label_vals = np.unique(example_labels) plt.subplot(display_num, 2, i + 2) plt.imshow(example_labels) plt.title("Masked Image") plt.suptitle("Examples of Images and their Masks") plt.show()
セットアップ
幾つかのパラメータの設定から始めましょう。画像の総ての shape を標準化してリサイズします。幾つかの訓練パラメータも設定します :
img_shape = (256, 256, 3) batch_size = 3 epochs = 5
これらの正確な同じパラメータの使用は貴方のハードウェアにとっては計算集約的過ぎるので、必要に応じてパラメータをいじります。また、UNet バージョンのアーキテクチャにより、画像サイズは 32 の factor で均等に割り切れなければなりません、何故ならば各 MaxPooling2Dlayer で 2 の factor で空間解像度をダウンサンプリングするからです。
貴方のマシンがそれをサポートできるのであれば、より高い解像度の入力画像 (e.g. 512 x 512) を使用してより良いパフォーマンスを獲得するでしょう、何故ならばこれはエンコーディングの間により正確な位置特定とより少ない情報の損失を可能にするからです。更に、モデルをより深くすることもできます。
代わりに、貴方のマシンがそれをサポートできないのであれば、画像解像度 and/or バッチサイズをより小さくしてください。画像解像度を小さくすればパフォーマンスを下げてバッチサイズを小さくすれば訓練時間を増大させます。
入力パイプラインを tf.data で構築する
ファイル名から始めますので、モデルと上手く戯れる、堅牢でスケーラブルなデータ・パイプラインを構築する必要があります。
入力パイプラインは次のステップから成ります :
- ファイル名からファイルのバイトを読みます – 画像とラベルの両者のために。ラベルは実際には自動車か背景 (1, 0) としてアノテートされた各ピクセルを持つ画像であることを思い出してください。
- バイトを画像フォーマットにデコードします。
- 画像変換を適用します: (オプション、入力パラメータに従って)
- resize – 画像を (eda か計算/メモリ制限により決定される) 標準サイズにリサイズします。
- これがオプションである理由は U-Net が完全畳み込みネットワーク (e.g. 完全結合ユニットを持たない) であるからでそのため入力サイズには依拠しません。けれども、もし画像をリサイズしないことを選択する場合、1 のバッチサイズを使用する必要があります、何故ならば可変な画像サイズを一緒にバッチ処理できないからです。
- 代わりに、同程度の画像のリサイズを回避するために画像をまとめてバケツに入れてそれらをミニバッチ毎にリサイズすることもできます、何故ならばリサイズは補間を通してパフォーマンスに影響を与えるかもしれないからです。
- hue_delta – ランダム factor で RGB 画像の色合いを調整します。これは実際の画像にのみ適用されます (ラベル画像ではなく)。hue_delta は区間 [0, 0.5] 内でなければなりません。
- horizontal_flip – 0.5 の確率で中心軸に沿って水平に画像を反転します。この変換はラベルと実際の画像の両者に適用されなければなりません。
- width_shift_range と height_shift_range は (総計の幅か高さの比率としての) 範囲で、その範囲内で画像を水平か垂直にランダムに変換します。この変換はラベルと実際の画像の両者に適用されなければなりません。
- rescale – 画像を特定の factor, e.g. 1/ 255 でリスケールします。
- resize – 画像を (eda か計算/メモリ制限により決定される) 標準サイズにリサイズします。
- データをシャッフルし、データを反復し (エポックに渡り複数回それを反復できます)、データをバッチ処理し、それからバッチを prefetch します (効率性のため)。
各パス名を処理する
def _process_pathnames(fname, label_path): # We map this function onto each pathname pair img_str = tf.read_file(fname) img = tf.image.decode_jpeg(img_str, channels=3) label_img_str = tf.read_file(label_path) # These are gif images so they return as (num_frames, h, w, c) label_img = tf.image.decode_gif(label_img_str)[0] # The label image should only have values of 1 or 0, indicating pixel wise # object (car) or not (background). We take the first channel only. label_img = label_img[:, :, 0] label_img = tf.expand_dims(label_img, axis=-1) return img, label_img
画像をシフトする
def shift_img(output_img, label_img, width_shift_range, height_shift_range): """This fn will perform the horizontal or vertical shift""" if width_shift_range or height_shift_range: if width_shift_range: width_shift_range = tf.random_uniform([], -width_shift_range * img_shape[1], width_shift_range * img_shape[1]) if height_shift_range: height_shift_range = tf.random_uniform([], -height_shift_range * img_shape[0], height_shift_range * img_shape[0]) # Translate both output_img = tfcontrib.image.translate(output_img, [width_shift_range, height_shift_range]) label_img = tfcontrib.image.translate(label_img, [width_shift_range, height_shift_range]) return output_img, label_img
画像をランダムに反転する
def flip_img(horizontal_flip, tr_img, label_img): if horizontal_flip: flip_prob = tf.random_uniform([], 0.0, 1.0) tr_img, label_img = tf.cond(tf.less(flip_prob, 0.5), lambda: (tf.image.flip_left_right(tr_img), tf.image.flip_left_right(label_img)), lambda: (tr_img, label_img)) return tr_img, label_img
変換を augment 関数に集約する
def _augment(img, label_img, resize=None, # Resize the image to some size e.g. [256, 256] scale=1, # Scale image e.g. 1 / 255. hue_delta=0, # Adjust the hue of an RGB image by random factor horizontal_flip=False, # Random left right flip, width_shift_range=0, # Randomly translate the image horizontally height_shift_range=0): # Randomly translate the image vertically if resize is not None: # Resize both images label_img = tf.image.resize_images(label_img, resize) img = tf.image.resize_images(img, resize) if hue_delta: img = tf.image.random_hue(img, hue_delta) img, label_img = flip_img(horizontal_flip, img, label_img) img, label_img = shift_img(img, label_img, width_shift_range, height_shift_range) label_img = tf.to_float(label_img) * scale img = tf.to_float(img) * scale return img, label_img
def get_baseline_dataset(filenames, labels, preproc_fn=functools.partial(_augment), threads=5, batch_size=batch_size, shuffle=True): num_x = len(filenames) # Create a dataset from the filenames and labels dataset = tf.data.Dataset.from_tensor_slices((filenames, labels)) # Map our preprocessing function to every element in our dataset, taking # advantage of multithreading dataset = dataset.map(_process_pathnames, num_parallel_calls=threads) if preproc_fn.keywords is not None and 'resize' not in preproc_fn.keywords: assert batch_size == 1, "Batching images must be of the same size" dataset = dataset.map(preproc_fn, num_parallel_calls=threads) if shuffle: dataset = dataset.shuffle(num_x) # It's necessary to repeat our data for all epochs dataset = dataset.repeat().batch(batch_size) return dataset
訓練と検証データセットをセットアップする
画像増強を訓練セットに適用しますが検証データセットには適用しないことに注意してください。
tr_cfg = { 'resize': [img_shape[0], img_shape[1]], 'scale': 1 / 255., 'hue_delta': 0.1, 'horizontal_flip': True, 'width_shift_range': 0.1, 'height_shift_range': 0.1 } tr_preprocessing_fn = functools.partial(_augment, **tr_cfg)
val_cfg = { 'resize': [img_shape[0], img_shape[1]], 'scale': 1 / 255., } val_preprocessing_fn = functools.partial(_augment, **val_cfg)
train_ds = get_baseline_dataset(x_train_filenames, y_train_filenames, preproc_fn=tr_preprocessing_fn, batch_size=batch_size) val_ds = get_baseline_dataset(x_val_filenames, y_val_filenames, preproc_fn=val_preprocessing_fn, batch_size=batch_size)
画像増強データパイプラインが期待された結果を生成するか見てみます
temp_ds = get_baseline_dataset(x_train_filenames, y_train_filenames, preproc_fn=tr_preprocessing_fn, batch_size=1, shuffle=False) # Let's examine some of these augmented images data_aug_iter = temp_ds.make_one_shot_iterator() next_element = data_aug_iter.get_next() with tf.Session() as sess: batch_of_imgs, label = sess.run(next_element) # Running next element in our graph will produce a batch of images plt.figure(figsize=(10, 10)) img = batch_of_imgs[0] plt.subplot(1, 2, 1) plt.imshow(img) plt.subplot(1, 2, 2) plt.imshow(label[0, :, :, 0]) plt.show()
モデルを構築する
U-Net モデルを構築します。U-net は特にセグメンテーション・タスクに良く、何故ならばそれは高解像度セグメンテーション・マスクを提供するために上手く位置特定できるからです。更に、それは小さいデータセットで上手く動作して overfitting に対して比較的堅牢です、画像内のパッチの数の観点から訓練データは訓練画像自身の数よりもはるかに巨大だからです。元のモデルと違い、ブロックの各々にバッチ正規化を追加します。
Unet はエンコーダ部とデコーダ部で構築されます。エンコーダ部は Conv, BatchNorm, そして MaxPool が続く Relu 演算の線形スタックから成ります。各 MaxPool は特徴マップの空間的解像度を 2 の factor で減じます。各ブロックの出力を追跡します、これらの高解像特徴マップをデコーダ部に供給しますので。デコーダ部は UpSampling2D, Conv, BatchNorm, と Relus から成ります。デコーダ側上で同じサイズの特徴マップを結合することに注意してください。最後に、各個別のピクセルのためのチャネルに沿って畳み込みを遂行する (kernel size of (1, 1)) 、最後の Conv 演算を追加します、これは最後のセグメンテーション・マスクをグレースケールで出力します。
Keras Functional API
Keras functional API は複数の入力/出力モデル、共有層, etc. を持つときに使用されます。それはパワフルな API で tensor を操作して結びついたデータストリームを持つ複雑なグラフを容易に構築することを可能にします。更にそれは層とモデルの両者を tensor 上で呼び出し可能にします。これらのヘルパー関数を構築します、モデルブロック演算を容易にに単純に調査させることを可能にします。
def conv_block(input_tensor, num_filters): encoder = layers.Conv2D(num_filters, (3, 3), padding='same')(input_tensor) encoder = layers.BatchNormalization()(encoder) encoder = layers.Activation('relu')(encoder) encoder = layers.Conv2D(num_filters, (3, 3), padding='same')(encoder) encoder = layers.BatchNormalization()(encoder) encoder = layers.Activation('relu')(encoder) return encoder def encoder_block(input_tensor, num_filters): encoder = conv_block(input_tensor, num_filters) encoder_pool = layers.MaxPooling2D((2, 2), strides=(2, 2))(encoder) return encoder_pool, encoder def decoder_block(input_tensor, concat_tensor, num_filters): decoder = layers.Conv2DTranspose(num_filters, (2, 2), strides=(2, 2), padding='same')(input_tensor) decoder = layers.concatenate([concat_tensor, decoder], axis=-1) decoder = layers.BatchNormalization()(decoder) decoder = layers.Activation('relu')(decoder) decoder = layers.Conv2D(num_filters, (3, 3), padding='same')(decoder) decoder = layers.BatchNormalization()(decoder) decoder = layers.Activation('relu')(decoder) decoder = layers.Conv2D(num_filters, (3, 3), padding='same')(decoder) decoder = layers.BatchNormalization()(decoder) decoder = layers.Activation('relu')(decoder) return decoder
inputs = layers.Input(shape=img_shape) # 256 encoder0_pool, encoder0 = encoder_block(inputs, 32) # 128 encoder1_pool, encoder1 = encoder_block(encoder0_pool, 64) # 64 encoder2_pool, encoder2 = encoder_block(encoder1_pool, 128) # 32 encoder3_pool, encoder3 = encoder_block(encoder2_pool, 256) # 16 encoder4_pool, encoder4 = encoder_block(encoder3_pool, 512) # 8 center = conv_block(encoder4_pool, 1024) # center decoder4 = decoder_block(center, encoder4, 512) # 16 decoder3 = decoder_block(decoder4, encoder3, 256) # 32 decoder2 = decoder_block(decoder3, encoder2, 128) # 64 decoder1 = decoder_block(decoder2, encoder1, 64) # 128 decoder0 = decoder_block(decoder1, encoder0, 32) # 256 outputs = layers.Conv2D(1, (1, 1), activation='sigmoid')(decoder0)
モデルを定義する
functional API を使用して、モデルに関連する入力と出力を指定することによりモデルを定義しなければなりません。
model = models.Model(inputs=[inputs], outputs=[outputs])
カスタム・メトリックと損失関数を定義する
損失とメトリック関数の定義は Keras では単純です。与えられたサンプルに対する True ラベルと同じ与えられたサンプルに対する予測ラベルを取る関数を単純に定義します。
Dice 損失は overlap を計測するメトリックです。Dice 係数 (dice 損失) のための最適化についてのより多くの情報は ペーパー で見つかります、そこでそれが紹介されました。
ここでは dice 損失を使用します、何故ならば設計上それはクラス不均衡な問題でより良い遂行をするからです。そして、dice 係数と IoU メトリックの最大化は私達のセグメンテーション・タスクの実際の目的と目標です。交差エントロピーの使用はより代理的ですが、最大化するのはより簡単です。代わりに、目的を直接的に最大化します。
def dice_coeff(y_true, y_pred): smooth = 1. # Flatten y_true_f = tf.reshape(y_true, [-1]) y_pred_f = tf.reshape(y_pred, [-1]) intersection = tf.reduce_sum(y_true_f * y_pred_f) score = (2. * intersection + smooth) / (tf.reduce_sum(y_true_f) + tf.reduce_sum(y_pred_f) + smooth) return score
def dice_loss(y_true, y_pred): loss = 1 - dice_coeff(y_true, y_pred) return loss
ここで、バイナリ交差エントロピーと dice 損失を結合した特別な損失関数を使用します。これはより良い結果を経験的に得るためにこのコンペティションで競った人達に基づきます。パフォーマンスを測定するために貴方自身のカスタム損失を試してください (e.g. bce + log(dice_loss), only bce, etc.) !
def bce_dice_loss(y_true, y_pred): loss = losses.binary_crossentropy(y_true, y_pred) + dice_loss(y_true, y_pred) return loss
モデルをコンパイルする
最小化するためにカスタム損失関数を使用します。更に、訓練時にどのメトリックを追跡することを望むかを指定します。メトリックは、パラメータを調整するための訓練プロセスの間は実際には使用されず、しかし代わりに訓練プロセスのパフォーマンスを測定するために使用されることに注意してください。
model.compile(optimizer='adam', loss=bce_dice_loss, metrics=[dice_loss]) model.summary()
モデルを訓練する
tf.data でモデルを訓練することはモデルの fit 関数に訓練/検証データセット、ステップとエポック数を単純に提供することを伴います。
Model callback、ModelCheckpoint もまた含みます、これは各エポック後にモデルをディスクにセーブするものです。それが最高のパフォーマンスを示したモデルだけをセーブするようにそれを configure します。モデルのセーブは単なるモデルの重み以上を捕捉することに注意してください : デフォルトでは、それはモデル・アーキテクチャ、重み、そして optimizer の状態のような訓練プロセスについての情報をセーブします。
save_model_path = '/tmp/weights.hdf5' cp = tf.keras.callbacks.ModelCheckpoint(filepath=save_model_path, monitor='val_dice_loss', save_best_only=True, verbose=1)
fit 関数呼び出しでモデル callback を指定することを忘れないでください。
history = model.fit(train_ds, steps_per_epoch=int(np.ceil(num_train_examples / float(batch_size))), epochs=epochs, validation_data=val_ds, validation_steps=int(np.ceil(num_val_examples / float(batch_size))), callbacks=[cp])
訓練プロセスを可視化する
dice = history.history['dice_loss'] val_dice = history.history['val_dice_loss'] loss = history.history['loss'] val_loss = history.history['val_loss'] epochs_range = range(epochs) plt.figure(figsize=(16, 8)) plt.subplot(1, 2, 1) plt.plot(epochs_range, dice, label='Training Dice Loss') plt.plot(epochs_range, val_dice, label='Validation Dice Loss') plt.legend(loc='upper right') plt.title('Training and Validation Dice Loss') plt.subplot(1, 2, 2) plt.plot(epochs_range, loss, label='Training Loss') plt.plot(epochs_range, val_loss, label='Validation Loss') plt.legend(loc='upper right') plt.title('Training and Validation Loss') plt.show()
5 エポックだけでさえも、強いパフォーマンスを見ます。
実際のパフォーマンスを可視化する
検証セット上のパフォーマンスを可視化します。
実際の設定 (コンペティション、配備, etc.) では完全な画像解像度を持つテストセット上で評価します。
モデルをロードするためには 2 つのオプションを持ちます :
- モデル・アーキテクチャは既にメモリ内にあるので、単純に load_weights(save_model_path) を呼び出すことができます。
- モデルをスクラッチから (メモリ内のモデル・アーキテクチャを既に持つことなしに異なる設定で) ロードすることを望む場合、単純に次を呼び出します :
model = models.load_model(save_model_path, custom_objects={'bce_dice_loss': bce_dice_loss, 'dice_loss': dice_loss})
必要なカスタム・オブジェクト、損失とメトリックを指定します、これらはモデルを訓練するために使用しました。
# Alternatively, load the weights directly: model.load_weights(save_model_path) model = models.load_model(save_model_path, custom_objects={'bce_dice_loss': bce_dice_loss, 'dice_loss': dice_loss})
# Let's visualize some of the outputs data_aug_iter = val_ds.make_one_shot_iterator() next_element = data_aug_iter.get_next() # Running next element in our graph will produce a batch of images plt.figure(figsize=(10, 20)) for i in range(5): batch_of_imgs, label = tf.keras.backend.get_session().run(next_element) img = batch_of_imgs[0] predicted_label = model.predict(batch_of_imgs)[0] plt.subplot(5, 3, 3 * i + 1) plt.imshow(img) plt.title("Input image") plt.subplot(5, 3, 3 * i + 2) plt.imshow(label[0, :, :, 0]) plt.title("Actual Mask") plt.subplot(5, 3, 3 * i + 3) plt.imshow(predicted_label[:, :, 0]) plt.title("Predicted Mask") plt.suptitle("Examples of Input Image, Label, and Prediction") plt.show()
以上