Keras 2 : examples : NLP – BERT によるテキスト抽出 (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 06/08/2022 (keras 2.9.0)
* 本ページは、Keras の以下のドキュメントを翻訳した上で適宜、補足説明したものです:
- Code examples : Natural Language Processing : Text Extraction with BERT (Author: Apoorv Nandan)
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。
- 人工知能研究開発支援
- 人工知能研修サービス(経営者層向けオンサイト研修)
- テクニカルコンサルティングサービス
- 実証実験(プロトタイプ構築)
- アプリケーションへの実装
- 人工知能研修サービス
- PoC(概念実証)を失敗させないための支援
- お住まいの地域に関係なく Web ブラウザからご参加頂けます。事前登録 が必要ですのでご注意ください。
◆ お問合せ : 本件に関するお問い合わせ先は下記までお願いいたします。
- 株式会社クラスキャット セールス・マーケティング本部 セールス・インフォメーション
- sales-info@classcat.com ; Web: www.classcat.com ; ClassCatJP
Keras 2 : examples : 自然言語処理 – BERT によるテキスト抽出
Description : HuggingFace Transformers の事前訓練済み BERT の SQuAD 上での再調整。
イントロダクション
このデモは SQuAD (Stanford 質問応答データセット) を使用します。SQuAD では、入力は質問とコンテキストのパラグラフから構成されます。目標は、質問に答えるパラグラフのテキストのスパンを見つけることです。このデータ上の性能は「完全一致」(= Exact Match) メトリックで評価します、これは正解の任意の一つに正確に一致した予測のパーセンテージを測定します。
このタスクを遂行するために BERT モデルを以下のように最調整します :
- BERT への入力としてコンテキストと質問を供給します。
- BERT の隠れ状態の次元に等しい次元を持つ 2 つのベクトル S と T を取ります。
- 各トークンが回答のスパンの開始と終了である確率を計算します。トークンが回答の開始である確率は S と BERT の (総てのトークンに対する softmax が続く) 最終層のトークンの表現のドット積で与えられます。トークンが回答の終了である確率は同様にベクトル T で計算されます。
- BERT を最調整してその過程で S と T を学習します。
References:
セットアップ
import os
import re
import json
import string
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tokenizers import BertWordPieceTokenizer
from transformers import BertTokenizer, TFBertModel, BertConfig
max_len = 384
configuration = BertConfig() # default parameters and configuration for BERT
BERT トークナイザーのセットアップ
# Save the slow pretrained tokenizer
slow_tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
save_path = "bert_base_uncased/"
if not os.path.exists(save_path):
os.makedirs(save_path)
slow_tokenizer.save_pretrained(save_path)
# Load the fast tokenizer from saved file
tokenizer = BertWordPieceTokenizer("bert_base_uncased/vocab.txt", lowercase=True)
データのロード
train_data_url = "https://rajpurkar.github.io/SQuAD-explorer/dataset/train-v1.1.json"
train_path = keras.utils.get_file("train.json", train_data_url)
eval_data_url = "https://rajpurkar.github.io/SQuAD-explorer/dataset/dev-v1.1.json"
eval_path = keras.utils.get_file("eval.json", eval_data_url)
データの前処理
- JSON ファイルを通り抜けて総てのレコードを SquadExample オブジェクトとしてストアします。
- 各 SquadExample を通り抜けて create x_train, y_train, x_eval, y_eval を作成します。
class SquadExample:
def __init__(self, question, context, start_char_idx, answer_text, all_answers):
self.question = question
self.context = context
self.start_char_idx = start_char_idx
self.answer_text = answer_text
self.all_answers = all_answers
self.skip = False
def preprocess(self):
context = self.context
question = self.question
answer_text = self.answer_text
start_char_idx = self.start_char_idx
# Clean context, answer and question
context = " ".join(str(context).split())
question = " ".join(str(question).split())
answer = " ".join(str(answer_text).split())
# Find end character index of answer in context
end_char_idx = start_char_idx + len(answer)
if end_char_idx >= len(context):
self.skip = True
return
# Mark the character indexes in context that are in answer
is_char_in_ans = [0] * len(context)
for idx in range(start_char_idx, end_char_idx):
is_char_in_ans[idx] = 1
# Tokenize context
tokenized_context = tokenizer.encode(context)
# Find tokens that were created from answer characters
ans_token_idx = []
for idx, (start, end) in enumerate(tokenized_context.offsets):
if sum(is_char_in_ans[start:end]) > 0:
ans_token_idx.append(idx)
if len(ans_token_idx) == 0:
self.skip = True
return
# Find start and end token index for tokens from answer
start_token_idx = ans_token_idx[0]
end_token_idx = ans_token_idx[-1]
# Tokenize question
tokenized_question = tokenizer.encode(question)
# Create inputs
input_ids = tokenized_context.ids + tokenized_question.ids[1:]
token_type_ids = [0] * len(tokenized_context.ids) + [1] * len(
tokenized_question.ids[1:]
)
attention_mask = [1] * len(input_ids)
# Pad and create attention masks.
# Skip if truncation is needed
padding_length = max_len - len(input_ids)
if padding_length > 0: # pad
input_ids = input_ids + ([0] * padding_length)
attention_mask = attention_mask + ([0] * padding_length)
token_type_ids = token_type_ids + ([0] * padding_length)
elif padding_length < 0: # skip
self.skip = True
return
self.input_ids = input_ids
self.token_type_ids = token_type_ids
self.attention_mask = attention_mask
self.start_token_idx = start_token_idx
self.end_token_idx = end_token_idx
self.context_token_to_char = tokenized_context.offsets
with open(train_path) as f:
raw_train_data = json.load(f)
with open(eval_path) as f:
raw_eval_data = json.load(f)
def create_squad_examples(raw_data):
squad_examples = []
for item in raw_data["data"]:
for para in item["paragraphs"]:
context = para["context"]
for qa in para["qas"]:
question = qa["question"]
answer_text = qa["answers"][0]["text"]
all_answers = [_["text"] for _ in qa["answers"]]
start_char_idx = qa["answers"][0]["answer_start"]
squad_eg = SquadExample(
question, context, start_char_idx, answer_text, all_answers
)
squad_eg.preprocess()
squad_examples.append(squad_eg)
return squad_examples
def create_inputs_targets(squad_examples):
dataset_dict = {
"input_ids": [],
"token_type_ids": [],
"attention_mask": [],
"start_token_idx": [],
"end_token_idx": [],
}
for item in squad_examples:
if item.skip == False:
for key in dataset_dict:
dataset_dict[key].append(getattr(item, key))
for key in dataset_dict:
dataset_dict[key] = np.array(dataset_dict[key])
x = [
dataset_dict["input_ids"],
dataset_dict["token_type_ids"],
dataset_dict["attention_mask"],
]
y = [dataset_dict["start_token_idx"], dataset_dict["end_token_idx"]]
return x, y
train_squad_examples = create_squad_examples(raw_train_data)
x_train, y_train = create_inputs_targets(train_squad_examples)
print(f"{len(train_squad_examples)} training points created.")
eval_squad_examples = create_squad_examples(raw_eval_data)
x_eval, y_eval = create_inputs_targets(eval_squad_examples)
print(f"{len(eval_squad_examples)} evaluation points created.")
87599 training points created. 10570 evaluation points created.
BERT and Functional API を使用して質問応答モデルを作成します。
def create_model():
## BERT encoder
encoder = TFBertModel.from_pretrained("bert-base-uncased")
## QA Model
input_ids = layers.Input(shape=(max_len,), dtype=tf.int32)
token_type_ids = layers.Input(shape=(max_len,), dtype=tf.int32)
attention_mask = layers.Input(shape=(max_len,), dtype=tf.int32)
embedding = encoder(
input_ids, token_type_ids=token_type_ids, attention_mask=attention_mask
)[0]
start_logits = layers.Dense(1, name="start_logit", use_bias=False)(embedding)
start_logits = layers.Flatten()(start_logits)
end_logits = layers.Dense(1, name="end_logit", use_bias=False)(embedding)
end_logits = layers.Flatten()(end_logits)
start_probs = layers.Activation(keras.activations.softmax)(start_logits)
end_probs = layers.Activation(keras.activations.softmax)(end_logits)
model = keras.Model(
inputs=[input_ids, token_type_ids, attention_mask],
outputs=[start_probs, end_probs],
)
loss = keras.losses.SparseCategoricalCrossentropy(from_logits=False)
optimizer = keras.optimizers.Adam(lr=5e-5)
model.compile(optimizer=optimizer, loss=[loss, loss])
return model
このコードはできれば Google Colab TPU ランタイムで実行されるべきです。Colab TPU で、各エポックは 5-6 分かかります。
use_tpu = True
if use_tpu:
# Create distribution strategy
tpu = tf.distribute.cluster_resolver.TPUClusterResolver.connect()
strategy = tf.distribute.TPUStrategy(tpu)
# Create model
with strategy.scope():
model = create_model()
else:
model = create_model()
model.summary()
INFO:absl:Entering into master device scope: /job:worker/replica:0/task:0/device:CPU:0 INFO:tensorflow:Initializing the TPU system: grpc://10.48.159.170:8470 INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Finished initializing TPU system. INFO:tensorflow:Found TPU system: INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 Model: "model" __________________________________________________________________________________________________ Layer (type) Output Shape Param # Connected to ================================================================================================== input_1 (InputLayer) [(None, 384)] 0 __________________________________________________________________________________________________ input_3 (InputLayer) [(None, 384)] 0 __________________________________________________________________________________________________ input_2 (InputLayer) [(None, 384)] 0 __________________________________________________________________________________________________ tf_bert_model (TFBertModel) ((None, 384, 768), ( 109482240 input_1[0][0] __________________________________________________________________________________________________ start_logit (Dense) (None, 384, 1) 768 tf_bert_model[0][0] __________________________________________________________________________________________________ end_logit (Dense) (None, 384, 1) 768 tf_bert_model[0][0] __________________________________________________________________________________________________ flatten (Flatten) (None, 384) 0 start_logit[0][0] __________________________________________________________________________________________________ flatten_1 (Flatten) (None, 384) 0 end_logit[0][0] __________________________________________________________________________________________________ activation_7 (Activation) (None, 384) 0 flatten[0][0] __________________________________________________________________________________________________ activation_8 (Activation) (None, 384) 0 flatten_1[0][0] ================================================================================================== Total params: 109,483,776 Trainable params: 109,483,776 Non-trainable params: 0
評価コールバックの作成
このコールバックは各エポックの後に検証データを使用して正確な一致スコアを計算します。
def normalize_text(text):
text = text.lower()
# Remove punctuations
exclude = set(string.punctuation)
text = "".join(ch for ch in text if ch not in exclude)
# Remove articles
regex = re.compile(r"\b(a|an|the)\b", re.UNICODE)
text = re.sub(regex, " ", text)
# Remove extra white space
text = " ".join(text.split())
return text
class ExactMatch(keras.callbacks.Callback):
"""
Each `SquadExample` object contains the character level offsets for each token
in its input paragraph. We use them to get back the span of text corresponding
to the tokens between our predicted start and end tokens.
All the ground-truth answers are also present in each `SquadExample` object.
We calculate the percentage of data points where the span of text obtained
from model predictions matches one of the ground-truth answers.
"""
def __init__(self, x_eval, y_eval):
self.x_eval = x_eval
self.y_eval = y_eval
def on_epoch_end(self, epoch, logs=None):
pred_start, pred_end = self.model.predict(self.x_eval)
count = 0
eval_examples_no_skip = [_ for _ in eval_squad_examples if _.skip == False]
for idx, (start, end) in enumerate(zip(pred_start, pred_end)):
squad_eg = eval_examples_no_skip[idx]
offsets = squad_eg.context_token_to_char
start = np.argmax(start)
end = np.argmax(end)
if start >= len(offsets):
continue
pred_char_start = offsets[start][0]
if end < len(offsets):
pred_char_end = offsets[end][1]
pred_ans = squad_eg.context[pred_char_start:pred_char_end]
else:
pred_ans = squad_eg.context[pred_char_start:]
normalized_pred_ans = normalize_text(pred_ans)
normalized_true_ans = [normalize_text(_) for _ in squad_eg.all_answers]
if normalized_pred_ans in normalized_true_ans:
count += 1
acc = count / len(self.y_eval[0])
print(f"\nepoch={epoch+1}, exact match score={acc:.2f}")
訓練と評価
exact_match_callback = ExactMatch(x_eval, y_eval)
model.fit(
x_train,
y_train,
epochs=1, # For demonstration, 3 epochs are recommended
verbose=2,
batch_size=64,
callbacks=[exact_match_callback],
)
epoch=1, exact match score=0.78 1346/1346 - 350s - activation_7_loss: 1.3488 - loss: 2.5905 - activation_8_loss: 1.2417 <tensorflow.python.keras.callbacks.History at 0x7fc78b4458d0>
以上