ホーム » TensorFlow » TensorFlow : Tutorials : 生成モデル : 画像キャプショニング with Attention

TensorFlow : Tutorials : 生成モデル : 画像キャプショニング with Attention

TensorFlow : Tutorials : 生成モデル : 画像キャプショニング with Attention (翻訳/解説)

翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 09/05/2018

* TensorFlow 1.10 で更に改訂されています。
* TensorFlow 1.9 でドキュメント構成が変更され、数篇が新規に追加されましたので再翻訳しました。
* 本ページは、TensorFlow の本家サイトの Tutorials – Generative Models の以下のページを翻訳した上で
適宜、補足説明したものです:

* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。

 

画像キャプショニング with Attention

画像キャプショニングは画像のためのキャプション (訳注: 表題、短い説明文) を生成するタスクです。このような画像が与えられたとき :

Image Source, License: Public Domain

 
私達の目標は “a surfer riding on a wave” のようなキャプションを生成することです。ここでは、attention ベースのモデルを使用します。これは、(モデルが) キャプションを生成するとき画像のどのパートにモデルがフォーカスするかを見ることを可能にします。

下のこのモデル・アーキテクチャは Show, Attend and Tell: Neural Image Caption Generation with Visual Attention に類似しています。

コードは tf.keras と eager execution を使用します。

このノートブックは end-to-end なサンプルです。それを実行すれば、MS-COCO データセットをダウンロードして Inception V3 を使用して画像のサブセットを前処理してキャッシュして、エンコーダ-デコーダ・モデルを訓練して、そして新しい画像上でキャプションを生成するためにそれを使用します。

コードは TensorFlow バージョン >=1.9 を必要とします。

このサンプルでは、例として比較的小さい総量のデータ上で訓練しています。単一の P100 GPU 上で、このサンプルは訓練におよそ ~2 時間かかるでしょう。最初の 30,000 キャプション上で訓練します (シャッフルに依拠しておよそ ~20,000 画像上で訓練します、データセットの画像毎に複数のキャプションがあるからです)。

# Import TensorFlow and enable eager execution
# This code requires TensorFlow version >=1.9
import tensorflow as tf
tf.enable_eager_execution()

# We'll generate plots of attention in order to see which parts of an image
# our model focuses on during captioning
import matplotlib.pyplot as plt

# Scikit-learn includes many helpful utilities
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

import re
import numpy as np
import os
import time
import json
from glob import glob
from PIL import Image
import pickle

 

MS-COCO データセットをダウンロードして準備する

モデルを訓練するために MS-COCO データセットを使用します。このデータセットは >82,000 画像を含み、その各々は少なくとも 5 つの異なるキャプションでアノテートされています。下のコードはデータセットを自動的にダウンロードして抽出します。

警告: これから先に巨大なダウンロード。訓練セットを使用します、それは 13 GB ファイルです。

annotation_zip = tf.keras.utils.get_file('captions.zip', 
                                          cache_subdir=os.path.abspath('.'),
                                          origin = 'http://images.cocodataset.org/annotations/annotations_trainval2014.zip',
                                          extract = True)
annotation_file = os.path.dirname(annotation_zip)+'/annotations/captions_train2014.json'

name_of_zip = 'train2014.zip'
if not os.path.exists(os.path.abspath('.') + '/' + name_of_zip):
  image_zip = tf.keras.utils.get_file(name_of_zip, 
                                      cache_subdir=os.path.abspath('.'),
                                      origin = 'http://images.cocodataset.org/zips/train2014.zip',
                                      extract = True)
  PATH = os.path.dirname(image_zip)+'/train2014/'
else:
  PATH = os.path.abspath('.')+'/train2014/'

 

オプションで、より高速な訓練のために訓練セットのサイズを制限する

このサンプルのために、30,000 キャプションのサブセットを選択し、これと対応する画像をモデルを訓練するために使用します。通例のように、より多くのデータを使用することを選択すれば、キャプショニング品質は改善します。

# read the json file
with open(annotation_file, 'r') as f:
    annotations = json.load(f)

# storing the captions and the image name in vectors
all_captions = []
all_img_name_vector = []

for annot in annotations['annotations']:
    caption = ' ' + annot['caption'] + ' '
    image_id = annot['image_id']
    full_coco_image_path = PATH + 'COCO_train2014_' + '%012d.jpg' % (image_id)
    
    all_img_name_vector.append(full_coco_image_path)
    all_captions.append(caption)

# shuffling the captions and image_names together
# setting a random state
train_captions, img_name_vector = shuffle(all_captions,
                                          all_img_name_vector,
                                          random_state=1)

# selecting the first 30000 captions from the shuffled set
num_examples = 30000
train_captions = train_captions[:num_examples]
img_name_vector = img_name_vector[:num_examples]

len(train_captions), len(all_captions)

 

InceptionV3 を使用して画像を前処理する

次に、各画像を分類するために (Imagenet で事前訓練された) InceptionV3 を使用します。最後の畳込み層から特徴を抽出します。

最初に、画像を次により inceptionV3 が想定するフォーマットに変換する必要があります :

  • 画像を (299, 299) にリサイズします。
  • preprocess_input メソッドを使用してピクセルを -1 から 1 の範囲に置きます (InceptionV3 を訓練するために使用された画像のフォーマットに適合させるためです)。
def load_image(image_path):
    img = tf.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize_images(img, (299, 299))
    img = tf.keras.applications.inception_v3.preprocess_input(img)
    return img, image_path

 

InceptionV3 を初期化して事前訓練された Imagenet 重みをロードする

そのため、tf.keras モデルを作成します、そこでは出力層は InceptionV3 アーキテクチャの最後の畳み込み層です。

  • 各画像はネットワークを通して forward されて最後に得られるベクトルは辞書にストアされます (image_name –> feature_vector)。
  • 最後の畳み込み層を使用します、何故ならばこのサンプルで attention を使用しているからです。この層の出力の shape は 8x8x2048 です。
  • 訓練の間はこれを行なうことを回避しますので、ボトルネックにはなりません。
  • 総ての画像がネットワークを通された後、辞書を pickle 化してそれをディスクにセーブします。
mage_model = tf.keras.applications.InceptionV3(include_top=False, 
                                                weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output

image_features_extract_model = tf.keras.Model(new_input, hidden_layer)

 

InceptionV3 から抽出した特徴をキャッシュする

各画像を InceptionV3 で前処理して出力をディスクにキャッシュします。出力の RAM 内のキャッシングはより高速ですがメモリ集約的で、画像毎に 8 * 8 * 2048 floats を必要とします。書く時点で、これは Colab のメモリ制限を超過するでしょう(これらは変更されるかもしれませんが、現在インスタンスはおよそ 12 GB メモリを持つようです)。

パフォーマンスは更に多いコードの代償でより洗練されたキャッシング・ストラテジー (e.g., ディスク I/O へのランダムアクセスを減じるために画像をシャーディングする) で改善されるかもしれません。

これは GPU を持つ Colab でおよそ 10 分間かかります。進捗バーを見ることを望むのであれば、次を行なって: install tqdm (!pip install tqdm)、そしてこの行を変更することができます :

for img, path in image_dataset:

to:

for img, path in tqdm(image_dataset):.
# getting the unique images
encode_train = sorted(set(img_name_vector))

# feel free to change the batch_size according to your system configuration
image_dataset = tf.data.Dataset.from_tensor_slices(
                                encode_train).map(load_image).batch(16)

for img, path in image_dataset:
  batch_features = image_features_extract_model(img)
  batch_features = tf.reshape(batch_features, 
                              (batch_features.shape[0], -1, batch_features.shape[3]))

  for bf, p in zip(batch_features, path):
    path_of_feature = p.numpy().decode("utf-8")
    np.save(path_of_feature, bf.numpy())

 

キャプションを前処理してトークン化する

  • 最初にキャプションをトークン化します (e.g., スペースで分割することにより)。これはデータの総ての一意な単語の語彙を与えます (e.g., “surfing”, “football”, etc)。
  • 次に、メモリを節約するために語彙サイズを top 5,000 単語に制限します。総ての他の単語をトークン “UNK” (for unknown) で置き替えます。
  • 最後に word –> index マッピング (and vice-versa) を作成します。
  • それから最長のものと同じ長さになるように総てのシークエンスをパッドします。
# This will find the maximum length of any caption in our dataset
def calc_max_length(tensor):
    return max(len(t) for t in tensor)
# The steps above is a general process of dealing with text processing

# choosing the top 5000 words from the vocabulary
top_k = 5000
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k, 
                                                  oov_token="", 
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')
tokenizer.fit_on_texts(train_captions)
train_seqs = tokenizer.texts_to_sequences(train_captions)
tokenizer.word_index = {key:value for key, value in tokenizer.word_index.items() if value <= top_k}
# putting  token in the word2idx dictionary
tokenizer.word_index[tokenizer.oov_token] = top_k + 1
tokenizer.word_index[''] = 0
# creating the tokenized vectors
train_seqs = tokenizer.texts_to_sequences(train_captions)
# creating a reverse mapping (index -> word)
index_word = {value:key for key, value in tokenizer.word_index.items()}
# padding each vector to the max_length of the captions
# if the max_length parameter is not provided, pad_sequences calculates that automatically
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')
# calculating the max_length 
# used to store the attention weights
max_length = calc_max_length(train_seqs)

 

データを訓練とテストに分割する

# Create training and validation sets using 80-20 split
img_name_train, img_name_val, cap_train, cap_val = train_test_split(img_name_vector, 
                                                                    cap_vector, 
                                                                    test_size=0.2, 
                                                                    random_state=0)
len(img_name_train), len(cap_train), len(img_name_val), len(cap_val)

 

画像とキャプションの準備ができました!次に、モデルを訓練するために tf.data データセットを作成しましょう。

# feel free to change these parameters according to your system's configuration

BATCH_SIZE = 64
BUFFER_SIZE = 1000
embedding_dim = 256
units = 512
vocab_size = len(tokenizer.word_index)
# shape of the vector extracted from InceptionV3 is (64, 2048)
# these two variables represent that
features_shape = 2048
attention_features_shape = 64
# loading the numpy files 
def map_func(img_name, cap):
    img_tensor = np.load(img_name.decode('utf-8')+'.npy')
    return img_tensor, cap
dataset = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train))

# using map to load the numpy files in parallel
# NOTE: Be sure to set num_parallel_calls to the number of CPU cores you have
# https://www.tensorflow.org/api_docs/python/tf/py_func
dataset = dataset.map(lambda item1, item2: tf.py_func(
          map_func, [item1, item2], [tf.float32, tf.int32]), num_parallel_calls=8)

# shuffling and batching
dataset = dataset.shuffle(BUFFER_SIZE)
# https://www.tensorflow.org/api_docs/python/tf/contrib/data/batch_and_drop_remainder
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.prefetch(1)

 

モデル

面白いことに、下のデコーダは ニューラル機械翻訳 with Attention のためのサンプルのものと同一です。

モデル・アーキテクチャは Show, Attend and Tell ペーパーによりインスパイアされています。

  • このサンプルでは、InceptionV3 のより低い畳み込み層から特徴を抽出します、これは shape (8, 8, 2048) のベクトルを与えます。
  • それを (64, 2048) の shape に押しつぶします。
  • それからこのベクトルは CNN エンコーダを通して渡されます (単一の完全結合層から成ります)。
  • RNN (ここでは GRU) が次の単語を予測するために画像を注視します。
def gru(units):
  # If you have a GPU, we recommend using the CuDNNGRU layer (it provides a 
  # significant speedup).
  if tf.test.is_gpu_available():
    return tf.keras.layers.CuDNNGRU(units, 
                                    return_sequences=True, 
                                    return_state=True, 
                                    recurrent_initializer='glorot_uniform')
  else:
    return tf.keras.layers.GRU(units, 
                               return_sequences=True, 
                               return_state=True, 
                               recurrent_activation='sigmoid', 
                               recurrent_initializer='glorot_uniform')
class BahdanauAttention(tf.keras.Model):
  def __init__(self, units):
    super(BahdanauAttention, self).__init__()
    self.W1 = tf.keras.layers.Dense(units)
    self.W2 = tf.keras.layers.Dense(units)
    self.V = tf.keras.layers.Dense(1)
  
  def call(self, features, hidden):
    # features(CNN_encoder output) shape == (batch_size, 64, embedding_dim)
    
    # hidden shape == (batch_size, hidden_size)
    # hidden_with_time_axis shape == (batch_size, 1, hidden_size)
    hidden_with_time_axis = tf.expand_dims(hidden, 1)
    
    # score shape == (batch_size, 64, hidden_size)
    score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))
    
    # attention_weights shape == (batch_size, 64, 1)
    # we get 1 at the last axis because we are applying score to self.V
    attention_weights = tf.nn.softmax(self.V(score), axis=1)
    
    # context_vector shape after sum == (batch_size, hidden_size)
    context_vector = attention_weights * features
    context_vector = tf.reduce_sum(context_vector, axis=1)
    
    return context_vector, attention_weights
class CNN_Encoder(tf.keras.Model):
    # Since we have already extracted the features and dumped it using pickle
    # This encoder passes those features through a Fully connected layer
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        # shape after fc == (batch_size, 64, embedding_dim)
        self.fc = tf.keras.layers.Dense(embedding_dim)
        
    def call(self, x):
        x = self.fc(x)
        x = tf.nn.relu(x)
        return x
class RNN_Decoder(tf.keras.Model):
  def __init__(self, embedding_dim, units, vocab_size):
    super(RNN_Decoder, self).__init__()
    self.units = units

    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = gru(self.units)
    self.fc1 = tf.keras.layers.Dense(self.units)
    self.fc2 = tf.keras.layers.Dense(vocab_size)
    
    self.attention = BahdanauAttention(self.units)
        
  def call(self, x, features, hidden):
    # defining attention as a separate model
    context_vector, attention_weights = self.attention(features, hidden)
    
    # x shape after passing through embedding == (batch_size, 1, embedding_dim)
    x = self.embedding(x)
    
    # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
    x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
    
    # passing the concatenated vector to the GRU
    output, state = self.gru(x)
    
    # shape == (batch_size, max_length, hidden_size)
    x = self.fc1(output)
    
    # x shape == (batch_size * max_length, hidden_size)
    x = tf.reshape(x, (-1, x.shape[2]))
    
    # output shape == (batch_size * max_length, vocab)
    x = self.fc2(x)

    return x, state, attention_weights

  def reset_state(self, batch_size):
    return tf.zeros((batch_size, self.units))
encoder = CNN_Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, vocab_size)
optimizer = tf.train.AdamOptimizer()

# We are masking the loss calculated for padding
def loss_function(real, pred):
    mask = 1 - np.equal(real, 0)
    loss_ = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=real, logits=pred) * mask
    return tf.reduce_mean(loss_)

 

訓練

  • それぞれの .npy ファイルにストアされている特徴を抽出してからそれらの特徴をエンコーダを通して渡します。
  • エンコーダ出力、(0 に初期化された) 隠れ状態そしてデコーダ入力 (それは start トークンです) がデコーダに渡されます。
  • デコーダは予測とデコーダ隠れ状態を返します。
  • そしてデコーダ隠れ状態はモデルに渡し戻されて予測は損失を計算するために使用されます。
  • デコーダへの次の入力を決めるために teacher forcing を使用します。
  • teacher forcing はそこではターゲット単語がデコーダへの次の入力として渡されるようなテクニックです。
  • 最後のステップは勾配を計算してそれを optimizer に適用してそして backpropagate します。
# adding this in a separate cell because if you run the training cell 
# many times, the loss_plot array will be reset
loss_plot = []
EPOCHS = 20

for epoch in range(EPOCHS):
    start = time.time()
    total_loss = 0
    
    for (batch, (img_tensor, target)) in enumerate(dataset):
        loss = 0
        
        # initializing the hidden state for each batch
        # because the captions are not related from image to image
        hidden = decoder.reset_state(batch_size=target.shape[0])

        dec_input = tf.expand_dims([tokenizer.word_index['']] * BATCH_SIZE, 1)
        
        with tf.GradientTape() as tape:
            features = encoder(img_tensor)
            
            for i in range(1, target.shape[1]):
                # passing the features through the decoder
                predictions, hidden, _ = decoder(dec_input, features, hidden)

                loss += loss_function(target[:, i], predictions)
                
                # using teacher forcing
                dec_input = tf.expand_dims(target[:, i], 1)
        
        total_loss += (loss / int(target.shape[1]))
        
        variables = encoder.variables + decoder.variables
        
        gradients = tape.gradient(loss, variables) 
        
        optimizer.apply_gradients(zip(gradients, variables), tf.train.get_or_create_global_step())
        
        if batch % 100 == 0:
            print ('Epoch {} Batch {} Loss {:.4f}'.format(epoch + 1, 
                                                          batch, 
                                                          loss.numpy() / int(target.shape[1])))
    # storing the epoch end loss value to plot later
    loss_plot.append(total_loss / len(cap_vector))
    
    print ('Epoch {} Loss {:.6f}'.format(epoch + 1, 
                                         total_loss/len(cap_vector)))
    print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))
plt.plot(loss_plot)
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss Plot')
plt.show()

 

キャプション!

  • evaluate 関数は訓練ループに類似しています、teacher forcing をここでは使用しないことを除いて。各時間ステップにおけるデコーダへの入力は隠れ状態とエンコーダ出力に加えてその前の予測です。
  • モデルが end トークンを予測するとき予測は停止します。
  • そして総ての時間ステップのために attention 重みをストアします。
def evaluate(image):
    attention_plot = np.zeros((max_length, attention_features_shape))

    hidden = decoder.reset_state(batch_size=1)

    temp_input = tf.expand_dims(load_image(image)[0], 0)
    img_tensor_val = image_features_extract_model(temp_input)
    img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3]))

    features = encoder(img_tensor_val)

    dec_input = tf.expand_dims([tokenizer.word_index['']], 0)
    result = []

    for i in range(max_length):
        predictions, hidden, attention_weights = decoder(dec_input, features, hidden)

        attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy()

        predicted_id = tf.multinomial(tf.exp(predictions), num_samples=1)[0][0].numpy()
        result.append(index_word[predicted_id])

        if index_word[predicted_id] == '':
            return result, attention_plot

        dec_input = tf.expand_dims([predicted_id], 0)

    attention_plot = attention_plot[:len(result), :]
    return result, attention_plot
def plot_attention(image, result, attention_plot):
    temp_image = np.array(Image.open(image))

    fig = plt.figure(figsize=(10, 10))
    
    len_result = len(result)
    for l in range(len_result):
        temp_att = np.resize(attention_plot[l], (8, 8))
        ax = fig.add_subplot(len_result//2, len_result//2, l+1)
        ax.set_title(result[l])
        img = ax.imshow(temp_image)
        ax.imshow(temp_att, cmap='gray', alpha=0.6, extent=img.get_extent())

    plt.tight_layout()
    plt.show()
# captions on the validation set
rid = np.random.randint(0, len(img_name_val))
image = img_name_val[rid]
real_caption = ' '.join([index_word[i] for i in cap_val[rid] if i not in [0]])
result, attention_plot = evaluate(image)

print ('Real Caption:', real_caption)
print ('Prediction Caption:', ' '.join(result))
plot_attention(image, result, attention_plot)
# opening the image
Image.open(img_name_val[rid])

 

貴方自身の画像でそれを試してください

楽しみのために、貴方自身の画像を丁度訓練したモデルでキャプションするための貴方が使用できるメソッドを下で提供しました。留意してください、それはデータの比較的小さい量の上で訓練されました、そして貴方の画像は訓練データとは異なるかもしれません (そのため奇妙な結果に備えてください!)

image_url = 'https://tensorflow.org/images/surf.jpg'
image_extension = image_url[-4:]
image_path = tf.keras.utils.get_file('image'+image_extension, 
                                     origin=image_url)

result, attention_plot = evaluate(image_path)
print ('Prediction Caption:', ' '.join(result))
plot_attention(image_path, result, attention_plot)
# opening the image
Image.open(image_path)
 

以上



AI導入支援 #2 ウェビナー

スモールスタートを可能としたAI導入支援   Vol.2
[無料 WEB セミナー] [詳細]
「画像認識 AI PoC スターターパック」の紹介
既に AI 技術を実ビジネスで活用し、成果を上げている日本企業も多く存在しており、競争優位なビジネスを展開しております。
しかしながら AI を導入したくとも PoC (概念実証) だけでも高額な費用がかかり取組めていない企業も少なくないようです。A I導入時には欠かせない PoC を手軽にしかも短期間で認知度を確認可能とするサービの紹介と共に、AI 技術の特性と具体的な導入プロセスに加え運用時のポイントについても解説いたします。
日時:2021年10月13日(水)
会場:WEBセミナー
共催:クラスキャット、日本FLOW(株)
後援:働き方改革推進コンソーシアム
参加費: 無料 (事前登録制)
人工知能開発支援
◆ クラスキャットは 人工知能研究開発支援 サービスを提供しています :
  • テクニカルコンサルティングサービス
  • 実証実験 (プロトタイプ構築)
  • アプリケーションへの実装
  • 人工知能研修サービス
◆ お問合せ先 ◆
(株)クラスキャット
セールス・インフォメーション
E-Mail:sales-info@classcat.com