Keras 2 : examples : 時系列 – グラフニューラルネットと LSTM による交通予測 (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
作成日時 : 06/20/2022 (keras 2.9.0)
* 本ページは、Keras の以下のドキュメントを翻訳した上で適宜、補足説明したものです:
- Code examples : Timeseries : Traffic forecasting using graph neural networks and LSTM (Author: Arash Khodadadi)
* サンプルコードの動作確認はしておりますが、必要な場合には適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。
- 人工知能研究開発支援
- 人工知能研修サービス(経営者層向けオンサイト研修)
- テクニカルコンサルティングサービス
- 実証実験(プロトタイプ構築)
- アプリケーションへの実装
- 人工知能研修サービス
- PoC(概念実証)を失敗させないための支援
- お住まいの地域に関係なく Web ブラウザからご参加頂けます。事前登録 が必要ですのでご注意ください。
◆ お問合せ : 本件に関するお問い合わせ先は下記までお願いいたします。
- 株式会社クラスキャット セールス・マーケティング本部 セールス・インフォメーション
- sales-info@classcat.com ; Web: www.classcat.com ; ClassCatJP
Keras 2 : examples : 時系列 – グラフニューラルネットと LSTM による交通予測
Description : このサンプルはグラフ上の時系列予測を行なう方法を実演します。
イントロダクション
このサンプルはグラフニューラルネットワークと LSTM を使用して交通の状態を予測する方法を示します。特に、道路セグメントのコレクションに対する交通速度の履歴が与えられたとき、交通速度の未来値を予測することに関心があります。
この問題を解くポピュラーな方法は、各道路セグメントの交通速度を分離した時系列として考えて、同じ時系列の過去値を使用して各時系列の未来値を予測することです。
けれども、この方法はある道路セグメントの交通速度の隣接セグメントへの依存性を無視しています。隣接する道路のコレクションの交通速度間の複雑な相互作用を考慮することを可能にするために、交通ネットワークをグラフとして定義して、交通速度をこのグラフのシグナルとして考えることができます。この例では、グラフ上の時系列データを処理できるニューラルネットワーク・アーキテクチャを実装します。データを処理してグラフ上の予測のために tf.data.Dataset を作成する方法を最初に示します。それから、グラフ上の予測を実行するためにグラフの畳み込みと LSTM 層を使用するモデルを実装します。
データ処理とモデルアーキテクチャはこの論文にインスパイアされています :
- Yu, Bing, Haoteng Yin, and Zhanxing Zhu. “Spatio-temporal graph convolutional networks: a deep learning framework for traffic forecasting.” Proceedings of the 27th International Joint Conference on Artificial Intelligence, 2018. (github)
セットアップ
import pandas as pd
import numpy as np
import os
import typing
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
データ準備
データの説明
PeMSD7 という名前の実世界の交通速度データセットを使用します。Yu et al., 2018 により収集され準備されたバージョンを使用します、それは ここ で利用可能です。
データは 2 つのファイルから構成されます :
- W_228.csv はカリフォルニアの地区 7 に渡る 228 のステーション間の距離を含みます。
- V_228.csv は 2012 年 5 月と 6 月の平日にそれらのステーションに対して収集された交通速度を含みます。
データセットの完全な記述は Yu et al., 2018 で見つかります。
データのロード
url = "https://github.com/VeritasYin/STGCN_IJCAI-18/raw/master/data_loader/PeMS-M.zip"
data_dir = keras.utils.get_file(origin=url, extract=True, archive_format="zip")
data_dir = data_dir.rstrip(".zip")
route_distances = pd.read_csv(
os.path.join(data_dir, "W_228.csv"), header=None
).to_numpy()
speeds_array = pd.read_csv(os.path.join(data_dir, "V_228.csv"), header=None).to_numpy()
print(f"route_distances shape={route_distances.shape}")
print(f"speeds_array shape={speeds_array.shape}")
route_distances shape=(228, 228) speeds_array shape=(12672, 228)
道路のサブサンプリング
問題サイズを減じて訓練を高速にするため、データセットの 228 道路から 26 道路のサンプルだけで作業します。道路 0 から始めて、それに最も近い 5 つの道路を選び、そしてこのプロセスを 25 の道路を取得するまで続けることによって道路を選択しました。道路の任意の他のサブセットを選択できます。この方法で道路を選択したのは相関性がある速度時系列を伴う道路を持つ尤度を高めるためです。
sample_routes = [
0,
1,
4,
7,
8,
11,
15,
108,
109,
114,
115,
118,
120,
123,
124,
126,
127,
129,
130,
132,
133,
136,
139,
144,
147,
216,
]
route_distances = route_distances[np.ix_(sample_routes, sample_routes)]
speeds_array = speeds_array[:, sample_routes]
print(f"route_distances shape={route_distances.shape}")
print(f"speeds_array shape={speeds_array.shape}")
route_distances shape=(26, 26) speeds_array shape=(12672, 26)
データ可視化
ここにルートの 2 つに対する交通速度の時系列があります :
plt.figure(figsize=(18, 6))
plt.plot(speeds_array[:, [0, -1]])
plt.legend(["route_0", "route_25"])
<matplotlib.legend.Legend at 0x7fea19dc90d0>
異なるルートの時系列間の相関性を可視化することもできます。
plt.figure(figsize=(8, 8))
plt.matshow(np.corrcoef(speeds_array.T), 0)
plt.xlabel("road number")
plt.ylabel("road number")
Text(0, 0.5, 'road number')
この相関ヒートマップを使用すれば、例えばルート 4, 5, 6 の速度が非常に相関していることが分かります。
データの分割と正規化
次に、速度値配列を訓練/検証/テストセットに分割して結果としての配列を正規化します :
train_size, val_size = 0.5, 0.2
def preprocess(data_array: np.ndarray, train_size: float, val_size: float):
"""Splits data into train/val/test sets and normalizes the data.
Args:
data_array: ndarray of shape `(num_time_steps, num_routes)`
train_size: A float value between 0.0 and 1.0 that represent the proportion of the dataset
to include in the train split.
val_size: A float value between 0.0 and 1.0 that represent the proportion of the dataset
to include in the validation split.
Returns:
`train_array`, `val_array`, `test_array`
"""
num_time_steps = data_array.shape[0]
num_train, num_val = (
int(num_time_steps * train_size),
int(num_time_steps * val_size),
)
train_array = data_array[:num_train]
mean, std = train_array.mean(axis=0), train_array.std(axis=0)
train_array = (train_array - mean) / std
val_array = (data_array[num_train : (num_train + num_val)] - mean) / std
test_array = (data_array[(num_train + num_val) :] - mean) / std
return train_array, val_array, test_array
train_array, val_array, test_array = preprocess(speeds_array, train_size, val_size)
print(f"train set size: {train_array.shape}")
print(f"validation set size: {val_array.shape}")
print(f"test set size: {test_array.shape}")
train set size: (6336, 26) validation set size: (2534, 26) test set size: (3802, 26)
TensorFlow データセットの作成
次に予測問題のためのデータセットを作成します。予測問題は以下のように記述できます : 時刻 t+1, t+2, …, t+T における道路速度値のシークエンスが与えられたとき、時刻 t+T+1, …, t+T+h に対する道路速度の未来値を予測することを望みます。従って各時刻 t に対してモデルへの入力は各々がサイズ N の T ベクトルで、ターゲットは各々がサイズ N の h ベクトルです、ここで N は道路数です。
Keras 組み込み関数 timeseries_dataset_from_array() を使用します。下の関数 create_tf_dataset() は入力として numpy.ndarray を取り tf.data.Dataset を返します。この関数内で input_sequence_length=T と forecast_horizon=h です。
引数 multi_horizon もう少し説明が必要です。forecast_horizon=3 と仮定します。multi_horizon=True であれば、モデルは時間ステップ t+T+1, t+T+2, t+T+3 に対する予測をします。従ってターゲットは shape (T, 3) を持ちます。しかし multi_horizon=False の場合には、モデルは時間ステップ t+T+3 に対してだけ予測しますので、ターゲットは shape (T, 1) になります。
各バッチの入力テンソルは shape (batch_size, input_sequence_length, num_routes, 1) を持つことに気づくかもしれません。最後の次元はモデルをより一般的にするために追加されます : 各時間ステップにおいて、各道路に対する入力特徴は複数の時系列を含む可能性があります。例えば、入力特徴として速度の履歴値に加えて気温の時系列を利用することを望むかもしれません。けれどもこの例では、入力の最後の次元は常に 1 です。
先の 3 時間ステップの速度を予測するために各道路の速度の最後の 12 値を使用します。
from tensorflow.keras.preprocessing import timeseries_dataset_from_array
batch_size = 64
input_sequence_length = 12
forecast_horizon = 3
multi_horizon = False
def create_tf_dataset(
data_array: np.ndarray,
input_sequence_length: int,
forecast_horizon: int,
batch_size: int = 128,
shuffle=True,
multi_horizon=True,
):
"""Creates tensorflow dataset from numpy array.
This function creates a dataset where each element is a tuple `(inputs, targets)`.
`inputs` is a Tensor
of shape `(batch_size, input_sequence_length, num_routes, 1)` containing
the `input_sequence_length` past values of the timeseries for each node.
`targets` is a Tensor of shape `(batch_size, forecast_horizon, num_routes)`
containing the `forecast_horizon`
future values of the timeseries for each node.
Args:
data_array: np.ndarray with shape `(num_time_steps, num_routes)`
input_sequence_length: Length of the input sequence (in number of timesteps).
forecast_horizon: If `multi_horizon=True`, the target will be the values of the timeseries for 1 to
`forecast_horizon` timesteps ahead. If `multi_horizon=False`, the target will be the value of the
timeseries `forecast_horizon` steps ahead (only one value).
batch_size: Number of timeseries samples in each batch.
shuffle: Whether to shuffle output samples, or instead draw them in chronological order.
multi_horizon: See `forecast_horizon`.
Returns:
A tf.data.Dataset instance.
"""
inputs = timeseries_dataset_from_array(
np.expand_dims(data_array[:-forecast_horizon], axis=-1),
None,
sequence_length=input_sequence_length,
shuffle=False,
batch_size=batch_size,
)
target_offset = (
input_sequence_length
if multi_horizon
else input_sequence_length + forecast_horizon - 1
)
target_seq_length = forecast_horizon if multi_horizon else 1
targets = timeseries_dataset_from_array(
data_array[target_offset:],
None,
sequence_length=target_seq_length,
shuffle=False,
batch_size=batch_size,
)
dataset = tf.data.Dataset.zip((inputs, targets))
if shuffle:
dataset = dataset.shuffle(100)
return dataset.prefetch(16).cache()
train_dataset, val_dataset = (
create_tf_dataset(data_array, input_sequence_length, forecast_horizon, batch_size)
for data_array in [train_array, val_array]
)
test_dataset = create_tf_dataset(
test_array,
input_sequence_length,
forecast_horizon,
batch_size=test_array.shape[0],
shuffle=False,
multi_horizon=multi_horizon,
)
道路のグラフ
前述のように、道路セグメントがグラフを形成していると仮定しています。PeMSD7 データセットは道路セグメントの距離を持ちます。次のステップはこれらの距離からグラフの隣接行列を作成することです。Yu et al., 2018 (equation 10) に従って、対応する道路間の距離が閾値よりも短い場合にはグラフの 2 つのノードの間にはエッジがあるものと仮定します。
def compute_adjacency_matrix(
route_distances: np.ndarray, sigma2: float, epsilon: float
):
"""Computes the adjacency matrix from distances matrix.
It uses the formula in https://github.com/VeritasYin/STGCN_IJCAI-18#data-preprocessing to
compute an adjacency matrix from the distance matrix.
The implementation follows that paper.
Args:
route_distances: np.ndarray of shape `(num_routes, num_routes)`. Entry `i,j` of this array is the
distance between roads `i,j`.
sigma2: Determines the width of the Gaussian kernel applied to the square distances matrix.
epsilon: A threshold specifying if there is an edge between two nodes. Specifically, `A[i,j]=1`
if `np.exp(-w2[i,j] / sigma2) >= epsilon` and `A[i,j]=0` otherwise, where `A` is the adjacency
matrix and `w2=route_distances * route_distances`
Returns:
A boolean graph adjacency matrix.
"""
num_routes = route_distances.shape[0]
route_distances = route_distances / 10000.0
w2, w_mask = (
route_distances * route_distances,
np.ones([num_routes, num_routes]) - np.identity(num_routes),
)
return (np.exp(-w2 / sigma2) >= epsilon) * w_mask
関数 compute_adjacency_matrix() はブーリアン隣接行列を返します、ここで 1 は 2 ノード間にエッジがあることを意味しています。グラフについての情報をストアするために次のクラスを使用します。
class GraphInfo:
def __init__(self, edges: typing.Tuple[list, list], num_nodes: int):
self.edges = edges
self.num_nodes = num_nodes
sigma2 = 0.1
epsilon = 0.5
adjacency_matrix = compute_adjacency_matrix(route_distances, sigma2, epsilon)
node_indices, neighbor_indices = np.where(adjacency_matrix == 1)
graph = GraphInfo(
edges=(node_indices.tolist(), neighbor_indices.tolist()),
num_nodes=adjacency_matrix.shape[0],
)
print(f"number of nodes: {graph.num_nodes}, number of edges: {len(graph.edges[0])}")
number of nodes: 26, number of edges: 150
ネットワーク・アーキテクチャ
グラフ上で予測するモデルはグラフ畳み込み層と LSTM 層から構成されます。
グラフ畳み込み層
グラフ畳み込みの実装は この Keras サンプル の実装に類似しています。その例では層への入力は shape (num_nodes, in_feat) の 2D テンソルですが、私達の例では層への入力は shape (num_nodes, batch_size, input_seq_length, in_feat) の 4D テンソルであることに注意してください。グラフ畳み込み層は以下のステップを実行します :
- ノードの表現は入力特徴に self.weight を乗算して self.compute_nodes_representation() で計算されます。
- 集積された近傍のメッセージは、まず近傍の表現を集積してから結果を self.weight で乗算して self.compute_aggregated_messages() で計算されます。
- 層の最終的な出力はノード表現と近傍の集積されたメッセージを連結して self.update() で計算されます。
class GraphConv(layers.Layer):
def __init__(
self,
in_feat,
out_feat,
graph_info: GraphInfo,
aggregation_type="mean",
combination_type="concat",
activation: typing.Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)
self.in_feat = in_feat
self.out_feat = out_feat
self.graph_info = graph_info
self.aggregation_type = aggregation_type
self.combination_type = combination_type
self.weight = tf.Variable(
initial_value=keras.initializers.glorot_uniform()(
shape=(in_feat, out_feat), dtype="float32"
),
trainable=True,
)
self.activation = layers.Activation(activation)
def aggregate(self, neighbour_representations: tf.Tensor):
aggregation_func = {
"sum": tf.math.unsorted_segment_sum,
"mean": tf.math.unsorted_segment_mean,
"max": tf.math.unsorted_segment_max,
}.get(self.aggregation_type)
if aggregation_func:
return aggregation_func(
neighbour_representations,
self.graph_info.edges[0],
num_segments=self.graph_info.num_nodes,
)
raise ValueError(f"Invalid aggregation type: {self.aggregation_type}")
def compute_nodes_representation(self, features: tf.Tensor):
"""Computes each node's representation.
The nodes' representations are obtained by multiplying the features tensor with
`self.weight`. Note that
`self.weight` has shape `(in_feat, out_feat)`.
Args:
features: Tensor of shape `(num_nodes, batch_size, input_seq_len, in_feat)`
Returns:
A tensor of shape `(num_nodes, batch_size, input_seq_len, out_feat)`
"""
return tf.matmul(features, self.weight)
def compute_aggregated_messages(self, features: tf.Tensor):
neighbour_representations = tf.gather(features, self.graph_info.edges[1])
aggregated_messages = self.aggregate(neighbour_representations)
return tf.matmul(aggregated_messages, self.weight)
def update(self, nodes_representation: tf.Tensor, aggregated_messages: tf.Tensor):
if self.combination_type == "concat":
h = tf.concat([nodes_representation, aggregated_messages], axis=-1)
elif self.combination_type == "add":
h = nodes_representation + aggregated_messages
else:
raise ValueError(f"Invalid combination type: {self.combination_type}.")
return self.activation(h)
def call(self, features: tf.Tensor):
"""Forward pass.
Args:
features: tensor of shape `(num_nodes, batch_size, input_seq_len, in_feat)`
Returns:
A tensor of shape `(num_nodes, batch_size, input_seq_len, out_feat)`
"""
nodes_representation = self.compute_nodes_representation(features)
aggregated_messages = self.compute_aggregated_messages(features)
return self.update(nodes_representation, aggregated_messages)
LSTM + グラフ畳み込み
入力テンソルにグラフ畳み込み層を適用することにより、時間に渡るノードの表現を含む別のテンソル (別の 4D テンソル) を取得します。各時間ステップについて、ノードの表現はその近傍からの情報により提供されます。
けれども、良い予測を行なうには、近傍からの情報を必要とするだけでなく、時間に渡る情報も処理する必要があります。この目的で、各ノードのテンソルをリカレント層に渡すことができます。下の LSTMGC 層は、最初に入力にグラフ畳み込み層を適用してから結果を LSTM 層に渡します。
class LSTMGC(layers.Layer):
"""Layer comprising a convolution layer followed by LSTM and dense layers."""
def __init__(
self,
in_feat,
out_feat,
lstm_units: int,
input_seq_len: int,
output_seq_len: int,
graph_info: GraphInfo,
graph_conv_params: typing.Optional[dict] = None,
**kwargs,
):
super().__init__(**kwargs)
# graph conv layer
if graph_conv_params is None:
graph_conv_params = {
"aggregation_type": "mean",
"combination_type": "concat",
"activation": None,
}
self.graph_conv = GraphConv(in_feat, out_feat, graph_info, **graph_conv_params)
self.lstm = layers.LSTM(lstm_units, activation="relu")
self.dense = layers.Dense(output_seq_len)
self.input_seq_len, self.output_seq_len = input_seq_len, output_seq_len
def call(self, inputs):
"""Forward pass.
Args:
inputs: tf.Tensor of shape `(batch_size, input_seq_len, num_nodes, in_feat)`
Returns:
A tensor of shape `(batch_size, output_seq_len, num_nodes)`.
"""
# convert shape to (num_nodes, batch_size, input_seq_len, in_feat)
inputs = tf.transpose(inputs, [2, 0, 1, 3])
gcn_out = self.graph_conv(
inputs
) # gcn_out has shape: (num_nodes, batch_size, input_seq_len, out_feat)
shape = tf.shape(gcn_out)
num_nodes, batch_size, input_seq_len, out_feat = (
shape[0],
shape[1],
shape[2],
shape[3],
)
# LSTM takes only 3D tensors as input
gcn_out = tf.reshape(gcn_out, (batch_size * num_nodes, input_seq_len, out_feat))
lstm_out = self.lstm(
gcn_out
) # lstm_out has shape: (batch_size * num_nodes, lstm_units)
dense_output = self.dense(
lstm_out
) # dense_output has shape: (batch_size * num_nodes, output_seq_len)
output = tf.reshape(dense_output, (num_nodes, batch_size, self.output_seq_len))
return tf.transpose(
output, [1, 2, 0]
) # returns Tensor of shape (batch_size, output_seq_len, num_nodes)
モデル訓練
in_feat = 1
batch_size = 64
epochs = 20
input_sequence_length = 12
forecast_horizon = 3
multi_horizon = False
out_feat = 10
lstm_units = 64
graph_conv_params = {
"aggregation_type": "mean",
"combination_type": "concat",
"activation": None,
}
st_gcn = LSTMGC(
in_feat,
out_feat,
lstm_units,
input_sequence_length,
forecast_horizon,
graph,
graph_conv_params,
)
inputs = layers.Input((input_sequence_length, graph.num_nodes, in_feat))
outputs = st_gcn(inputs)
model = keras.models.Model(inputs, outputs)
model.compile(
optimizer=keras.optimizers.RMSprop(learning_rate=0.0002),
loss=keras.losses.MeanSquaredError(),
)
model.fit(
train_dataset,
validation_data=val_dataset,
epochs=epochs,
callbacks=[keras.callbacks.EarlyStopping(patience=10)],
)
Epoch 1/20 99/99 [==============================] - 8s 69ms/step - loss: 0.5542 - val_loss: 0.2459 Epoch 2/20 99/99 [==============================] - 8s 84ms/step - loss: 0.1923 - val_loss: 0.1346 Epoch 3/20 99/99 [==============================] - 9s 92ms/step - loss: 0.1312 - val_loss: 0.1068 Epoch 4/20 99/99 [==============================] - 10s 100ms/step - loss: 0.1083 - val_loss: 0.0914 Epoch 5/20 99/99 [==============================] - 12s 120ms/step - loss: 0.0962 - val_loss: 0.0839 Epoch 6/20 99/99 [==============================] - 9s 94ms/step - loss: 0.0899 - val_loss: 0.0796 Epoch 7/20 99/99 [==============================] - 10s 102ms/step - loss: 0.0864 - val_loss: 0.0771 Epoch 8/20 99/99 [==============================] - 10s 103ms/step - loss: 0.0842 - val_loss: 0.0760 Epoch 9/20 99/99 [==============================] - 9s 91ms/step - loss: 0.0826 - val_loss: 0.0744 Epoch 10/20 99/99 [==============================] - 9s 95ms/step - loss: 0.0815 - val_loss: 0.0735 Epoch 11/20 99/99 [==============================] - 12s 118ms/step - loss: 0.0807 - val_loss: 0.0729 Epoch 12/20 99/99 [==============================] - 11s 106ms/step - loss: 0.0799 - val_loss: 0.0734 Epoch 13/20 99/99 [==============================] - 11s 114ms/step - loss: 0.0795 - val_loss: 0.0721 Epoch 14/20 99/99 [==============================] - 13s 133ms/step - loss: 0.0791 - val_loss: 0.0719 Epoch 15/20 99/99 [==============================] - 11s 114ms/step - loss: 0.0787 - val_loss: 0.0716 Epoch 16/20 99/99 [==============================] - 12s 118ms/step - loss: 0.0784 - val_loss: 0.0715 Epoch 17/20 99/99 [==============================] - 13s 131ms/step - loss: 0.0781 - val_loss: 0.0713 Epoch 18/20 99/99 [==============================] - 11s 111ms/step - loss: 0.0778 - val_loss: 0.0712 Epoch 19/20 99/99 [==============================] - 12s 121ms/step - loss: 0.0776 - val_loss: 0.0711 Epoch 20/20 99/99 [==============================] - 11s 116ms/step - loss: 0.0774 - val_loss: 0.0710 <keras.callbacks.History at 0x7fea223a10d0>
テストセットで予測する
今ではテストセットに対する予測を行なうために訓練済みモデルを使用できます。下では、モデルの MAE を計算してそれを naive な予測の MAE と比較します。naive な予測は各ノードの速度の最後の値です。
x_test, y = next(test_dataset.as_numpy_iterator())
y_pred = model.predict(x_test)
plt.figure(figsize=(18, 6))
plt.plot(y[:, 0, 0])
plt.plot(y_pred[:, 0, 0])
plt.legend(["actual", "forecast"])
naive_mse, model_mse = (
np.square(x_test[:, -1, :, 0] - y[:, 0, :]).mean(),
np.square(y_pred[:, 0, :] - y[:, 0, :]).mean(),
)
print(f"naive MAE: {naive_mse}, model MAE: {model_mse}")
naive MAE: 0.13472308593195767, model MAE: 0.12683941463664059
もちろん、ここでの目標は方法を実演することで、最良のパフォーマンスを獲得することではありません。モデルの精度を改良するため、総てのモデル・ハイパーパラメータは注意深く調整されるべきです。更に、モデルの表現力を高めるために幾つかの LSTMGC ブロックをスタックすることができます。
以上