TensorFlow : Tutorials : 生成モデル : 画像キャプショニング with Attention (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 09/05/2018
* TensorFlow 1.10 で更に改訂されています。
* TensorFlow 1.9 でドキュメント構成が変更され、数篇が新規に追加されましたので再翻訳しました。
* 本ページは、TensorFlow の本家サイトの Tutorials – Generative Models の以下のページを翻訳した上で
適宜、補足説明したものです:
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。
画像キャプショニング with Attention
画像キャプショニングは画像のためのキャプション (訳注: 表題、短い説明文) を生成するタスクです。このような画像が与えられたとき :
私達の目標は “a surfer riding on a wave” のようなキャプションを生成することです。ここでは、attention ベースのモデルを使用します。これは、(モデルが) キャプションを生成するとき画像のどのパートにモデルがフォーカスするかを見ることを可能にします。
下のこのモデル・アーキテクチャは Show, Attend and Tell: Neural Image Caption Generation with Visual Attention に類似しています。
コードは tf.keras と eager execution を使用します。
このノートブックは end-to-end なサンプルです。それを実行すれば、MS-COCO データセットをダウンロードして Inception V3 を使用して画像のサブセットを前処理してキャッシュして、エンコーダ-デコーダ・モデルを訓練して、そして新しい画像上でキャプションを生成するためにそれを使用します。
コードは TensorFlow バージョン >=1.9 を必要とします。
このサンプルでは、例として比較的小さい総量のデータ上で訓練しています。単一の P100 GPU 上で、このサンプルは訓練におよそ ~2 時間かかるでしょう。最初の 30,000 キャプション上で訓練します (シャッフルに依拠しておよそ ~20,000 画像上で訓練します、データセットの画像毎に複数のキャプションがあるからです)。
# Import TensorFlow and enable eager execution # This code requires TensorFlow version >=1.9 import tensorflow as tf tf.enable_eager_execution() # We'll generate plots of attention in order to see which parts of an image # our model focuses on during captioning import matplotlib.pyplot as plt # Scikit-learn includes many helpful utilities from sklearn.model_selection import train_test_split from sklearn.utils import shuffle import re import numpy as np import os import time import json from glob import glob from PIL import Image import pickle
MS-COCO データセットをダウンロードして準備する
モデルを訓練するために MS-COCO データセットを使用します。このデータセットは >82,000 画像を含み、その各々は少なくとも 5 つの異なるキャプションでアノテートされています。下のコードはデータセットを自動的にダウンロードして抽出します。
警告: これから先に巨大なダウンロード。訓練セットを使用します、それは 13 GB ファイルです。
annotation_zip = tf.keras.utils.get_file('captions.zip', cache_subdir=os.path.abspath('.'), origin = 'http://images.cocodataset.org/annotations/annotations_trainval2014.zip', extract = True) annotation_file = os.path.dirname(annotation_zip)+'/annotations/captions_train2014.json' name_of_zip = 'train2014.zip' if not os.path.exists(os.path.abspath('.') + '/' + name_of_zip): image_zip = tf.keras.utils.get_file(name_of_zip, cache_subdir=os.path.abspath('.'), origin = 'http://images.cocodataset.org/zips/train2014.zip', extract = True) PATH = os.path.dirname(image_zip)+'/train2014/' else: PATH = os.path.abspath('.')+'/train2014/'
オプションで、より高速な訓練のために訓練セットのサイズを制限する
このサンプルのために、30,000 キャプションのサブセットを選択し、これと対応する画像をモデルを訓練するために使用します。通例のように、より多くのデータを使用することを選択すれば、キャプショニング品質は改善します。
# read the json file with open(annotation_file, 'r') as f: annotations = json.load(f) # storing the captions and the image name in vectors all_captions = [] all_img_name_vector = [] for annot in annotations['annotations']: caption = '' + annot['caption'] + ' ' image_id = annot['image_id'] full_coco_image_path = PATH + 'COCO_train2014_' + '%012d.jpg' % (image_id) all_img_name_vector.append(full_coco_image_path) all_captions.append(caption) # shuffling the captions and image_names together # setting a random state train_captions, img_name_vector = shuffle(all_captions, all_img_name_vector, random_state=1) # selecting the first 30000 captions from the shuffled set num_examples = 30000 train_captions = train_captions[:num_examples] img_name_vector = img_name_vector[:num_examples]
len(train_captions), len(all_captions)
InceptionV3 を使用して画像を前処理する
次に、各画像を分類するために (Imagenet で事前訓練された) InceptionV3 を使用します。最後の畳込み層から特徴を抽出します。
最初に、画像を次により inceptionV3 が想定するフォーマットに変換する必要があります :
- 画像を (299, 299) にリサイズします。
- preprocess_input メソッドを使用してピクセルを -1 から 1 の範囲に置きます (InceptionV3 を訓練するために使用された画像のフォーマットに適合させるためです)。
def load_image(image_path): img = tf.read_file(image_path) img = tf.image.decode_jpeg(img, channels=3) img = tf.image.resize_images(img, (299, 299)) img = tf.keras.applications.inception_v3.preprocess_input(img) return img, image_path
InceptionV3 を初期化して事前訓練された Imagenet 重みをロードする
そのため、tf.keras モデルを作成します、そこでは出力層は InceptionV3 アーキテクチャの最後の畳み込み層です。
- 各画像はネットワークを通して forward されて最後に得られるベクトルは辞書にストアされます (image_name –> feature_vector)。
- 最後の畳み込み層を使用します、何故ならばこのサンプルで attention を使用しているからです。この層の出力の shape は 8x8x2048 です。
- 訓練の間はこれを行なうことを回避しますので、ボトルネックにはなりません。
- 総ての画像がネットワークを通された後、辞書を pickle 化してそれをディスクにセーブします。
mage_model = tf.keras.applications.InceptionV3(include_top=False, weights='imagenet') new_input = image_model.input hidden_layer = image_model.layers[-1].output image_features_extract_model = tf.keras.Model(new_input, hidden_layer)
InceptionV3 から抽出した特徴をキャッシュする
各画像を InceptionV3 で前処理して出力をディスクにキャッシュします。出力の RAM 内のキャッシングはより高速ですがメモリ集約的で、画像毎に 8 * 8 * 2048 floats を必要とします。書く時点で、これは Colab のメモリ制限を超過するでしょう(これらは変更されるかもしれませんが、現在インスタンスはおよそ 12 GB メモリを持つようです)。
パフォーマンスは更に多いコードの代償でより洗練されたキャッシング・ストラテジー (e.g., ディスク I/O へのランダムアクセスを減じるために画像をシャーディングする) で改善されるかもしれません。
これは GPU を持つ Colab でおよそ 10 分間かかります。進捗バーを見ることを望むのであれば、次を行なって: install tqdm (!pip install tqdm)、そしてこの行を変更することができます :
for img, path in image_dataset: to: for img, path in tqdm(image_dataset):.
# getting the unique images encode_train = sorted(set(img_name_vector)) # feel free to change the batch_size according to your system configuration image_dataset = tf.data.Dataset.from_tensor_slices( encode_train).map(load_image).batch(16) for img, path in image_dataset: batch_features = image_features_extract_model(img) batch_features = tf.reshape(batch_features, (batch_features.shape[0], -1, batch_features.shape[3])) for bf, p in zip(batch_features, path): path_of_feature = p.numpy().decode("utf-8") np.save(path_of_feature, bf.numpy())
キャプションを前処理してトークン化する
- 最初にキャプションをトークン化します (e.g., スペースで分割することにより)。これはデータの総ての一意な単語の語彙を与えます (e.g., “surfing”, “football”, etc)。
- 次に、メモリを節約するために語彙サイズを top 5,000 単語に制限します。総ての他の単語をトークン “UNK” (for unknown) で置き替えます。
- 最後に word –> index マッピング (and vice-versa) を作成します。
- それから最長のものと同じ長さになるように総てのシークエンスをパッドします。
# This will find the maximum length of any caption in our dataset def calc_max_length(tensor): return max(len(t) for t in tensor)
# The steps above is a general process of dealing with text processing # choosing the top 5000 words from the vocabulary top_k = 5000 tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k, oov_token="", filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ') tokenizer.fit_on_texts(train_captions) train_seqs = tokenizer.texts_to_sequences(train_captions)
tokenizer.word_index = {key:value for key, value in tokenizer.word_index.items() if value <= top_k} # puttingtoken in the word2idx dictionary tokenizer.word_index[tokenizer.oov_token] = top_k + 1 tokenizer.word_index[' '] = 0
# creating the tokenized vectors train_seqs = tokenizer.texts_to_sequences(train_captions)
# creating a reverse mapping (index -> word) index_word = {value:key for key, value in tokenizer.word_index.items()}
# padding each vector to the max_length of the captions # if the max_length parameter is not provided, pad_sequences calculates that automatically cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')
# calculating the max_length # used to store the attention weights max_length = calc_max_length(train_seqs)
データを訓練とテストに分割する
# Create training and validation sets using 80-20 split img_name_train, img_name_val, cap_train, cap_val = train_test_split(img_name_vector, cap_vector, test_size=0.2, random_state=0)
len(img_name_train), len(cap_train), len(img_name_val), len(cap_val)
画像とキャプションの準備ができました!次に、モデルを訓練するために tf.data データセットを作成しましょう。
# feel free to change these parameters according to your system's configuration BATCH_SIZE = 64 BUFFER_SIZE = 1000 embedding_dim = 256 units = 512 vocab_size = len(tokenizer.word_index) # shape of the vector extracted from InceptionV3 is (64, 2048) # these two variables represent that features_shape = 2048 attention_features_shape = 64
# loading the numpy files def map_func(img_name, cap): img_tensor = np.load(img_name.decode('utf-8')+'.npy') return img_tensor, cap
dataset = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train)) # using map to load the numpy files in parallel # NOTE: Be sure to set num_parallel_calls to the number of CPU cores you have # https://www.tensorflow.org/api_docs/python/tf/py_func dataset = dataset.map(lambda item1, item2: tf.py_func( map_func, [item1, item2], [tf.float32, tf.int32]), num_parallel_calls=8) # shuffling and batching dataset = dataset.shuffle(BUFFER_SIZE) # https://www.tensorflow.org/api_docs/python/tf/contrib/data/batch_and_drop_remainder dataset = dataset.batch(BATCH_SIZE) dataset = dataset.prefetch(1)
モデル
面白いことに、下のデコーダは ニューラル機械翻訳 with Attention のためのサンプルのものと同一です。
モデル・アーキテクチャは Show, Attend and Tell ペーパーによりインスパイアされています。
- このサンプルでは、InceptionV3 のより低い畳み込み層から特徴を抽出します、これは shape (8, 8, 2048) のベクトルを与えます。
- それを (64, 2048) の shape に押しつぶします。
- それからこのベクトルは CNN エンコーダを通して渡されます (単一の完全結合層から成ります)。
- RNN (ここでは GRU) が次の単語を予測するために画像を注視します。
def gru(units): # If you have a GPU, we recommend using the CuDNNGRU layer (it provides a # significant speedup). if tf.test.is_gpu_available(): return tf.keras.layers.CuDNNGRU(units, return_sequences=True, return_state=True, recurrent_initializer='glorot_uniform') else: return tf.keras.layers.GRU(units, return_sequences=True, return_state=True, recurrent_activation='sigmoid', recurrent_initializer='glorot_uniform')
class BahdanauAttention(tf.keras.Model): def __init__(self, units): super(BahdanauAttention, self).__init__() self.W1 = tf.keras.layers.Dense(units) self.W2 = tf.keras.layers.Dense(units) self.V = tf.keras.layers.Dense(1) def call(self, features, hidden): # features(CNN_encoder output) shape == (batch_size, 64, embedding_dim) # hidden shape == (batch_size, hidden_size) # hidden_with_time_axis shape == (batch_size, 1, hidden_size) hidden_with_time_axis = tf.expand_dims(hidden, 1) # score shape == (batch_size, 64, hidden_size) score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis)) # attention_weights shape == (batch_size, 64, 1) # we get 1 at the last axis because we are applying score to self.V attention_weights = tf.nn.softmax(self.V(score), axis=1) # context_vector shape after sum == (batch_size, hidden_size) context_vector = attention_weights * features context_vector = tf.reduce_sum(context_vector, axis=1) return context_vector, attention_weights
class CNN_Encoder(tf.keras.Model): # Since we have already extracted the features and dumped it using pickle # This encoder passes those features through a Fully connected layer def __init__(self, embedding_dim): super(CNN_Encoder, self).__init__() # shape after fc == (batch_size, 64, embedding_dim) self.fc = tf.keras.layers.Dense(embedding_dim) def call(self, x): x = self.fc(x) x = tf.nn.relu(x) return x
class RNN_Decoder(tf.keras.Model): def __init__(self, embedding_dim, units, vocab_size): super(RNN_Decoder, self).__init__() self.units = units self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim) self.gru = gru(self.units) self.fc1 = tf.keras.layers.Dense(self.units) self.fc2 = tf.keras.layers.Dense(vocab_size) self.attention = BahdanauAttention(self.units) def call(self, x, features, hidden): # defining attention as a separate model context_vector, attention_weights = self.attention(features, hidden) # x shape after passing through embedding == (batch_size, 1, embedding_dim) x = self.embedding(x) # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size) x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1) # passing the concatenated vector to the GRU output, state = self.gru(x) # shape == (batch_size, max_length, hidden_size) x = self.fc1(output) # x shape == (batch_size * max_length, hidden_size) x = tf.reshape(x, (-1, x.shape[2])) # output shape == (batch_size * max_length, vocab) x = self.fc2(x) return x, state, attention_weights def reset_state(self, batch_size): return tf.zeros((batch_size, self.units))
encoder = CNN_Encoder(embedding_dim) decoder = RNN_Decoder(embedding_dim, units, vocab_size)
optimizer = tf.train.AdamOptimizer() # We are masking the loss calculated for padding def loss_function(real, pred): mask = 1 - np.equal(real, 0) loss_ = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=real, logits=pred) * mask return tf.reduce_mean(loss_)
訓練
- それぞれの .npy ファイルにストアされている特徴を抽出してからそれらの特徴をエンコーダを通して渡します。
- エンコーダ出力、(0 に初期化された) 隠れ状態そしてデコーダ入力 (それは start トークンです) がデコーダに渡されます。
- デコーダは予測とデコーダ隠れ状態を返します。
- そしてデコーダ隠れ状態はモデルに渡し戻されて予測は損失を計算するために使用されます。
- デコーダへの次の入力を決めるために teacher forcing を使用します。
- teacher forcing はそこではターゲット単語がデコーダへの次の入力として渡されるようなテクニックです。
- 最後のステップは勾配を計算してそれを optimizer に適用してそして backpropagate します。
# adding this in a separate cell because if you run the training cell # many times, the loss_plot array will be reset loss_plot = []
EPOCHS = 20 for epoch in range(EPOCHS): start = time.time() total_loss = 0 for (batch, (img_tensor, target)) in enumerate(dataset): loss = 0 # initializing the hidden state for each batch # because the captions are not related from image to image hidden = decoder.reset_state(batch_size=target.shape[0]) dec_input = tf.expand_dims([tokenizer.word_index['']] * BATCH_SIZE, 1) with tf.GradientTape() as tape: features = encoder(img_tensor) for i in range(1, target.shape[1]): # passing the features through the decoder predictions, hidden, _ = decoder(dec_input, features, hidden) loss += loss_function(target[:, i], predictions) # using teacher forcing dec_input = tf.expand_dims(target[:, i], 1) total_loss += (loss / int(target.shape[1])) variables = encoder.variables + decoder.variables gradients = tape.gradient(loss, variables) optimizer.apply_gradients(zip(gradients, variables), tf.train.get_or_create_global_step()) if batch % 100 == 0: print ('Epoch {} Batch {} Loss {:.4f}'.format(epoch + 1, batch, loss.numpy() / int(target.shape[1]))) # storing the epoch end loss value to plot later loss_plot.append(total_loss / len(cap_vector)) print ('Epoch {} Loss {:.6f}'.format(epoch + 1, total_loss/len(cap_vector))) print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))
plt.plot(loss_plot) plt.xlabel('Epochs') plt.ylabel('Loss') plt.title('Loss Plot') plt.show()
キャプション!
- evaluate 関数は訓練ループに類似しています、teacher forcing をここでは使用しないことを除いて。各時間ステップにおけるデコーダへの入力は隠れ状態とエンコーダ出力に加えてその前の予測です。
- モデルが end トークンを予測するとき予測は停止します。
- そして総ての時間ステップのために attention 重みをストアします。
def evaluate(image): attention_plot = np.zeros((max_length, attention_features_shape)) hidden = decoder.reset_state(batch_size=1) temp_input = tf.expand_dims(load_image(image)[0], 0) img_tensor_val = image_features_extract_model(temp_input) img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3])) features = encoder(img_tensor_val) dec_input = tf.expand_dims([tokenizer.word_index['']], 0) result = [] for i in range(max_length): predictions, hidden, attention_weights = decoder(dec_input, features, hidden) attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy() predicted_id = tf.multinomial(tf.exp(predictions), num_samples=1)[0][0].numpy() result.append(index_word[predicted_id]) if index_word[predicted_id] == ' ': return result, attention_plot dec_input = tf.expand_dims([predicted_id], 0) attention_plot = attention_plot[:len(result), :] return result, attention_plot
def plot_attention(image, result, attention_plot): temp_image = np.array(Image.open(image)) fig = plt.figure(figsize=(10, 10)) len_result = len(result) for l in range(len_result): temp_att = np.resize(attention_plot[l], (8, 8)) ax = fig.add_subplot(len_result//2, len_result//2, l+1) ax.set_title(result[l]) img = ax.imshow(temp_image) ax.imshow(temp_att, cmap='gray', alpha=0.6, extent=img.get_extent()) plt.tight_layout() plt.show()
# captions on the validation set rid = np.random.randint(0, len(img_name_val)) image = img_name_val[rid] real_caption = ' '.join([index_word[i] for i in cap_val[rid] if i not in [0]]) result, attention_plot = evaluate(image) print ('Real Caption:', real_caption) print ('Prediction Caption:', ' '.join(result)) plot_attention(image, result, attention_plot) # opening the image Image.open(img_name_val[rid])
貴方自身の画像でそれを試してください
楽しみのために、貴方自身の画像を丁度訓練したモデルでキャプションするための貴方が使用できるメソッドを下で提供しました。留意してください、それはデータの比較的小さい量の上で訓練されました、そして貴方の画像は訓練データとは異なるかもしれません (そのため奇妙な結果に備えてください!)
image_url = 'https://tensorflow.org/images/surf.jpg' image_extension = image_url[-4:] image_path = tf.keras.utils.get_file('image'+image_extension, origin=image_url) result, attention_plot = evaluate(image_path) print ('Prediction Caption:', ' '.join(result)) plot_attention(image_path, result, attention_plot) # opening the image Image.open(image_path)
以上