Keras 2 : examples : NLP – sequence-to-sequence Transformer による英西翻訳 (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 05/31/2022 (keras 2.9.0)
* 本ページは、Keras の以下のドキュメントを翻訳した上で適宜、補足説明したものです:
- Code examples : Natural Language Processing : English-to-Spanish translation with a sequence-to-sequence Transformer (Author: fchollet)
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。
- 人工知能研究開発支援
- 人工知能研修サービス(経営者層向けオンサイト研修)
- テクニカルコンサルティングサービス
- 実証実験(プロトタイプ構築)
- アプリケーションへの実装
- 人工知能研修サービス
- PoC(概念実証)を失敗させないための支援
- お住まいの地域に関係なく Web ブラウザからご参加頂けます。事前登録 が必要ですのでご注意ください。
◆ お問合せ : 本件に関するお問い合わせ先は下記までお願いいたします。
- 株式会社クラスキャット セールス・マーケティング本部 セールス・インフォメーション
- sales-info@classcat.com ; Web: www.classcat.com ; ClassCatJP
Keras 2 : examples : 自然言語処理 – sequence-to-sequence Transformer による英西翻訳
Description : sequence-to-sequence Transformer の実装と機械翻訳タスクでのその訓練。
イントロダクション
この例では、sequence-to-sequence Transformer モデルを構築します、これを英西機械翻訳タスクで訓練します。
以下の方法を学習します :
- Keras TextVectorization 層を使用してテキストをベクトル化します。
- TransformerEncoder 層, TransformerDecoder 層, そして PositionalEmbedding 層を実装します。
- sequence-to-sequence モデルを訓練するためのデータを準備します。
- 未見の入力センテンスの翻訳を生成するために訓練済みモデルを使用します (sequence-to-sequence 推論)。
セットアップ
import pathlib
import random
import string
import re
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import TextVectorization
データのダウンロード
Anki により提供される英西翻訳データセットで作業していきます。それをダウンロードしましょう :
text_file = keras.utils.get_file(
fname="spa-eng.zip",
origin="http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip",
extract=True,
)
text_file = pathlib.Path(text_file).parent / "spa-eng" / "spa.txt"
データの解析
各行は英語のセンテンスと対応するスペイン語のセンテンスを含みます。英語のセンテンスはソース・シークエンスで、スペイン語のはターゲット・シークエンスですスペイン語のセンテンスの先頭にトークン “[start]” を追加してトークン “[end]” を追加します。
with open(text_file) as f:
lines = f.read().split("\n")[:-1]
text_pairs = []
for line in lines:
eng, spa = line.split("\t")
spa = "[start] " + spa + " [end]"
text_pairs.append((eng, spa))
ここにセンテンスペアがどのように見えるかがあります :
for _ in range(5):
print(random.choice(text_pairs))
("You can dance, can't you?", '[start] Puedes bailar, ¿verdad? [end]') ('I passed by her house yesterday.', '[start] Me pasé por su casa ayer. [end]') ('I like tulips.', '[start] Me gustan los tulipanes. [end]') ('He is fluent in French.', '[start] Habla un francés fluido. [end]') ('Tom asked me what I had been doing.', '[start] Tom me preguntó qué había estado haciendo. [end]')
そして、センテンスペアを訓練セット, 検証セット, そしてテストセットに分割します。
random.shuffle(text_pairs)
num_val_samples = int(0.15 * len(text_pairs))
num_train_samples = len(text_pairs) - 2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[num_train_samples + num_val_samples :]
print(f"{len(text_pairs)} total pairs")
print(f"{len(train_pairs)} training pairs")
print(f"{len(val_pairs)} validation pairs")
print(f"{len(test_pairs)} test pairs")
118964 total pairs 83276 training pairs 17844 validation pairs 17844 test pairs
テキストデータのベクトル化
テキストデータをベクトル化するために TextVectorization 層の 2 つのインスタンスを使用します (1 つは英語のためで 1 つはスペイン語のため)、つまり、元の文字列を整数シークエンスに変換します、そこでは各整数は語彙の単語のインデックスを表します。
英語層はデフォルトの文字列標準化 (句読点文字の除去) と分割スキーム (空白で分割) を使用し、スペイン語層はカスタム標準化を使用します、そこでは除去される句読点文字のセットに文字 “¿” を追加します。
Note : 製品レベルの機械翻訳モデルでは、いずれの言語でも句読点文字を除去することは勧めません。代わりに、各句読点文字をそれ自身のトークンに変換することを勧めます、これは TextVectorization 層へのカスタム split 関数を提供することで実現できるでしょう。
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")
vocab_size = 15000
sequence_length = 20
batch_size = 64
def custom_standardization(input_string):
lowercase = tf.strings.lower(input_string)
return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")
eng_vectorization = TextVectorization(
max_tokens=vocab_size, output_mode="int", output_sequence_length=sequence_length,
)
spa_vectorization = TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length + 1,
standardize=custom_standardization,
)
train_eng_texts = [pair[0] for pair in train_pairs]
train_spa_texts = [pair[1] for pair in train_pairs]
eng_vectorization.adapt(train_eng_texts)
spa_vectorization.adapt(train_spa_texts)
次に、データセットを形式化します。
各訓練ステップで、モデルはソースセンテンスとターゲット単語 0 から N を使用してターゲット単語 N+1 (そしてそれ以降) を予測しようとします。
そのようなものとして、訓練データセットはタプル (inputs, targets) を yield します、ここで :
- input はキー encoder_inputs と decoder_inputs を持つ辞書です。encoder_inputs はベクトル化されたソースセンテンスで、decoder_inputs は「それまでの」ターゲットセンテンスです、つまり、ターゲットセンテンスの単語 N+1 (そしてそれ以降) を予測するために使用される単語 0 から N です。
- target はターゲットセット・センテンスの 1 ステップ刻みのオフセットです : それはターゲット・センテンスの次の単語を提供します。それはモデルが予測しようとするものです。
def format_dataset(eng, spa):
eng = eng_vectorization(eng)
spa = spa_vectorization(spa)
return ({"encoder_inputs": eng, "decoder_inputs": spa[:, :-1],}, spa[:, 1:])
def make_dataset(pairs):
eng_texts, spa_texts = zip(*pairs)
eng_texts = list(eng_texts)
spa_texts = list(spa_texts)
dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
dataset = dataset.batch(batch_size)
dataset = dataset.map(format_dataset)
return dataset.shuffle(2048).prefetch(16).cache()
train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)
シークエンスの shape を素早く見てみましょう (64 ペアのバッチを持ち、そして総てのシークエンスは 20 ステップ長です) :
for inputs, targets in train_ds.take(1):
print(f'inputs["encoder_inputs"].shape: {inputs["encoder_inputs"].shape}')
print(f'inputs["decoder_inputs"].shape: {inputs["decoder_inputs"].shape}')
print(f"targets.shape: {targets.shape}")
inputs["encoder_inputs"].shape: (64, 20) inputs["decoder_inputs"].shape: (64, 20) targets.shape: (64, 20)
モデルの構築
私達の sequence-to-sequence Transformer は一緒に連結された TransformerEncoder と TransformerDecoder から構成されます。モデルが単語順序を近くするように、PositionalEmbedding 層 も使用します。
ソース・シークエンスは TransformerEncoder に渡されます、これはその新しい表現を生成します。そしてこの新しい表現は、それまでのターゲット・シークエンス (ターゲット単語 0 から N) と共に、TransformerDecoder に渡されます。そして TransformerDecoder はターゲット・シークエンスの次の単語 (N+1 とそれ以降) を予測しようとします。
これを可能にする主要な詳細は causal マスキングです (TransformerDecoder のメソッド get_causal_attention_mask() 参照)。TransformerDecoder はシークエンス全体を一度に見ますので、それがトークン N+1 を予測するときターゲットトークン 0 から N からの情報だけを使用することを確実にしなければなりません (そうでないなら、未来からの情報を使用できるでしょう、これは推論時に使用できないモデルという結果になります)。
class TransformerEncoder(layers.Layer):
def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
super(TransformerEncoder, self).__init__(**kwargs)
self.embed_dim = embed_dim
self.dense_dim = dense_dim
self.num_heads = num_heads
self.attention = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim
)
self.dense_proj = keras.Sequential(
[layers.Dense(dense_dim, activation="relu"), layers.Dense(embed_dim),]
)
self.layernorm_1 = layers.LayerNormalization()
self.layernorm_2 = layers.LayerNormalization()
self.supports_masking = True
def call(self, inputs, mask=None):
if mask is not None:
padding_mask = tf.cast(mask[:, tf.newaxis, tf.newaxis, :], dtype="int32")
attention_output = self.attention(
query=inputs, value=inputs, key=inputs, attention_mask=padding_mask
)
proj_input = self.layernorm_1(inputs + attention_output)
proj_output = self.dense_proj(proj_input)
return self.layernorm_2(proj_input + proj_output)
class PositionalEmbedding(layers.Layer):
def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
super(PositionalEmbedding, self).__init__(**kwargs)
self.token_embeddings = layers.Embedding(
input_dim=vocab_size, output_dim=embed_dim
)
self.position_embeddings = layers.Embedding(
input_dim=sequence_length, output_dim=embed_dim
)
self.sequence_length = sequence_length
self.vocab_size = vocab_size
self.embed_dim = embed_dim
def call(self, inputs):
length = tf.shape(inputs)[-1]
positions = tf.range(start=0, limit=length, delta=1)
embedded_tokens = self.token_embeddings(inputs)
embedded_positions = self.position_embeddings(positions)
return embedded_tokens + embedded_positions
def compute_mask(self, inputs, mask=None):
return tf.math.not_equal(inputs, 0)
class TransformerDecoder(layers.Layer):
def __init__(self, embed_dim, latent_dim, num_heads, **kwargs):
super(TransformerDecoder, self).__init__(**kwargs)
self.embed_dim = embed_dim
self.latent_dim = latent_dim
self.num_heads = num_heads
self.attention_1 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim
)
self.attention_2 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim
)
self.dense_proj = keras.Sequential(
[layers.Dense(latent_dim, activation="relu"), layers.Dense(embed_dim),]
)
self.layernorm_1 = layers.LayerNormalization()
self.layernorm_2 = layers.LayerNormalization()
self.layernorm_3 = layers.LayerNormalization()
self.supports_masking = True
def call(self, inputs, encoder_outputs, mask=None):
causal_mask = self.get_causal_attention_mask(inputs)
if mask is not None:
padding_mask = tf.cast(mask[:, tf.newaxis, :], dtype="int32")
padding_mask = tf.minimum(padding_mask, causal_mask)
attention_output_1 = self.attention_1(
query=inputs, value=inputs, key=inputs, attention_mask=causal_mask
)
out_1 = self.layernorm_1(inputs + attention_output_1)
attention_output_2 = self.attention_2(
query=out_1,
value=encoder_outputs,
key=encoder_outputs,
attention_mask=padding_mask,
)
out_2 = self.layernorm_2(out_1 + attention_output_2)
proj_output = self.dense_proj(out_2)
return self.layernorm_3(out_2 + proj_output)
def get_causal_attention_mask(self, inputs):
input_shape = tf.shape(inputs)
batch_size, sequence_length = input_shape[0], input_shape[1]
i = tf.range(sequence_length)[:, tf.newaxis]
j = tf.range(sequence_length)
mask = tf.cast(i >= j, dtype="int32")
mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
mult = tf.concat(
[tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
axis=0,
)
return tf.tile(mask, mult)
次に、end-to-end モデルを構成します。
embed_dim = 256
latent_dim = 2048
num_heads = 8
encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, latent_dim, num_heads)(x)
encoder = keras.Model(encoder_inputs, encoder_outputs)
decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
encoded_seq_inputs = keras.Input(shape=(None, embed_dim), name="decoder_state_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, latent_dim, num_heads)(x, encoded_seq_inputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
decoder = keras.Model([decoder_inputs, encoded_seq_inputs], decoder_outputs)
decoder_outputs = decoder([decoder_inputs, encoder_outputs])
transformer = keras.Model(
[encoder_inputs, decoder_inputs], decoder_outputs, name="transformer"
)
モデルの訓練
検証データで訓練進捗を監視するために素早い方法として精度を使用します。機械翻訳は典型的には精度ではなく、他のメトリクスとともに BLUE スコアを使用することに注意してください。
ここでは 1 エポック訓練するだけですが、実際に収束するモデルを得るには少なくとも 30 エポックは訓練するべきです。
epochs = 1 # This should be at least 30 for convergence
transformer.summary()
transformer.compile(
"rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
)
transformer.fit(train_ds, epochs=epochs, validation_data=val_ds)
Model: "transformer" __________________________________________________________________________________________________ Layer (type) Output Shape Param # Connected to ================================================================================================== encoder_inputs (InputLayer) [(None, None)] 0 __________________________________________________________________________________________________ positional_embedding (Positiona (None, None, 256) 3845120 encoder_inputs[0][0] __________________________________________________________________________________________________ decoder_inputs (InputLayer) [(None, None)] 0 __________________________________________________________________________________________________ transformer_encoder (Transforme (None, None, 256) 3155456 positional_embedding[0][0] __________________________________________________________________________________________________ model_1 (Functional) (None, None, 15000) 12959640 decoder_inputs[0][0] transformer_encoder[0][0] ================================================================================================== Total params: 19,960,216 Trainable params: 19,960,216 Non-trainable params: 0 __________________________________________________________________________________________________ 1302/1302 [==============================] - 1297s 993ms/step - loss: 1.6495 - accuracy: 0.4284 - val_loss: 1.2843 - val_accuracy: 0.5211 <tensorflow.python.keras.callbacks.History at 0x164a6c250>
テストセンテンスのデコード
最後に、真新しい英語センテンスを翻訳する方法を実演しましょう。単純にモデルにベクトル化された英語センテンスとターゲットトークン “[start]” を供給してから、トークン “[start]” にぶつかるまで、次のトークンを反復的に生成します。
spa_vocab = spa_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20
def decode_sequence(input_sentence):
tokenized_input_sentence = eng_vectorization([input_sentence])
decoded_sentence = "[start]"
for i in range(max_decoded_sentence_length):
tokenized_target_sentence = spa_vectorization([decoded_sentence])[:, :-1]
predictions = transformer([tokenized_input_sentence, tokenized_target_sentence])
sampled_token_index = np.argmax(predictions[0, i, :])
sampled_token = spa_index_lookup[sampled_token_index]
decoded_sentence += " " + sampled_token
if sampled_token == "[end]":
break
return decoded_sentence
test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(30):
input_sentence = random.choice(test_eng_texts)
translated = decode_sequence(input_sentence)
30 エポック後、以下のような結果を得ます :
She handed him the money. [start] ella le pasó el dinero [end] Tom has never heard Mary sing. [start] tom nunca ha oído cantar a mary [end] Perhaps she will come tomorrow. [start] tal vez ella vendrá mañana [end] I love to write. [start] me encanta escribir [end] His French is improving little by little. [start] su francés va a [UNK] sólo un poco [end] My hotel told me to call you. [start] mi hotel me dijo que te [UNK] [end]
以上