ホーム » TensorFlow » TensorFlow : Programmer’s Guide : データを読む

TensorFlow : Programmer’s Guide : データを読む

TensorFlow : Programmer’s Guide : データを読む (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
更新日時 : 04/26/2017; 07/26/2016
作成日時 : 02/12/2016

* 本家サイトのドキュメント構成の変更に伴い、本ページは以下のページをベースにするよう変更し、
また原文の加筆や変更に合わせて翻訳文も更新しました (04/26/2017) :
    https://www.tensorflow.org/programmers_guide/reading_data
* (obsolete) 本ページは、TensorFlow の本家サイトの How To – Reading data を翻訳した上で
適宜、補足説明したものです:
    (リンク切れ) https://www.tensorflow.org/versions/master/how_tos/reading_data/index.html#reading-data
* サンプルコードの動作確認はしておりますが、適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。

 
本文

TensorFlow プログラムにデータを取り込む3つの方法があります :

  • 供給 (Feeding) : 各ステップを実行する時に Python コードがデータを提供します。
  • ファイルから読む: TensorFlow グラフの最初に入力パイプラインがファイルからデータを読みます。
  • 事前ロードされたデータ (Preloaded data:): TensorFlow グラフの定数か変数が全てのデータ(小さいデータセット)を保持します。
 

供給 (Feeding)

TensorFlow の供給 (feed) メカニズムが計算グラフの任意のテンソルにデータを注入することを可能にします。Python 計算はこうしてデータをグラフに直接供給できます。

供給データは 計算を始める run() あるいは eval() 呼び出しへの feed_dict 引数で供給されます。

with tf.Session():
  input = tf.placeholder(tf.float32)
  classifier = ...
  print(classifier.eval(feed_dict={input: my_python_preprocessing_fn()}))

変数と定数を含む、任意のテンソルが供給データで置き換えられる一方で、ベスト・プラクティス (最良事例) は placeholder OP ノードを使うことです。placeholder は供給ターゲットとして serve するために単独で存在します。それは初期化されませんしデータも含みません。placeholder は供給なしに実行されるとエラーを生成しますので、それに供給することを忘れないでください。

MNIST データ上の訓練のために placeholder と供給を使用する例は
tensorflow/examples/tutorials/mnist/fully_connected_feed.py で見つかり、そして MNIST チュートリアル で説明されています。

 

ファイルから読む

ファイルからレコードを読むための典型的なパイプライン は次のステージを持ちます :

  1. ファイル名のリスト
  2. オプションのファイル名シャッフル
  3. オプションの epoch limit
  4. ファイル名・キュー
  5. ファイルフォーマットのためのリーダー (Reader)
  6. Reader で読まれるレコードのためのデコーダ (decoder)
  7. オプションの前処理 (preprocessing)
  8. サンプル・キュー (example queue)

ファイル名、シャッフルそして epoch limits

ファイル名のリストのためには、定数文字列 Tensor ( [“file0”, “file1”] のように、または [(“file%d” % i) for i in range(2)] ) あるいは tf.train.match_filenames_once 関数 を使います。

ファイル名のリストを tf.train.string_input_producer 関数 に渡します。string_input_producer はファイル名を reader がそれらを必要とするまで保持するために FIFO キューを作成します。

string_input_producer はシャッフルし epoch の最大値を設定するためのオプションを持ちます。queue runner はファイル名の全体リストをキューに各 epoch のために一度追加し、もし shuffle=True であれば一つの epoch 内でファイル名をシャッフルします。この手続きはファイル群の一様なサンプリングを提供し、その結果、サンプル群は互いに対して相対的に低くあるいは多くサンプルされることはありません。

queue runner は、queue からファイル名を引き出す reader とは別のスレッドで動作しますので、シャッフルとキューに入れるプロセスは reader をブロックしません。

ファイル・フォーマット

貴方の入力ファイル・フォーマットにマッチする reader を選択してファイル名キューを reader の read メソッドに渡します。read メソッドは、ファイルと(幾つかおかしなレコードがある場合にデバッグに有用な、)レコードを識別するキー、そしてスカラー文字列値を出力します。この文字列をサンプルを構成するテンソルに変換するためにデコーダと変換 OPs の一つ(あるいはそれ以上)を利用します。

CSV ファイル

カンマ区切り (CSV – comma-separated value) 形式のテキストファイルを読むためには、decode_csv 演算とともに TextLineReader を使用します。例えば :

filename_queue = tf.train.string_input_producer(["file0.csv", "file1.csv"])

reader = tf.TextLineReader()
key, value = reader.read(filename_queue)

# Default values, in case of empty columns. Also specifies the type of the
# decoded result.
record_defaults = [[1], [1], [1], [1], [1]]
col1, col2, col3, col4, col5 = tf.decode_csv(
    value, record_defaults=record_defaults)
features = tf.stack([col1, col2, col3, col4])

with tf.Session() as sess:
  # Start populating the filename queue.
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)

  for i in range(1200):
    # Retrieve a single instance:
    example, label = sess.run([features, col5])

  coord.request_stop()
  coord.join(threads)

read の各実行はファイルから単一の行を読みます。そして decode_csv OP は結果をテンソルのリストに解析します。record_defaults 引数は結果としてのテンソルの型を決定して、もし値が入力文字列で欠けているならばデフォルト値を設定します。

read を実行するために run または eval を呼び出す前に、キューへのデータの取り込むためには tf.train.start_queue_runners を呼び出さなければなりません。そうでないとキューからのファイル名を待っている間、 read はブロックします。

固定長レコード

各レコードがバイトの固定数である、バイナリ・ファイルを読むためには、 tf.decode_raw 演算とともに tf.FixedLengthRecordReader を使用します。decode_raw 演算は文字列から uint8 テンソルに変換します。

例えば、CIFAR-10 データセット は、各レコードがバイトの固定長を使用して表される、ファイルフォーマットを使用します: ラベルのための1 バイト、続いて 3072 バイトの画像データです。ひとたび uint8 テンソルを持てば、標準操作で各部分をスライスし必要に応じて再フォーマットすることができます。CIFAR-10 については、どのように読みデコードするかを tensorflow_models/tutorials/image/cifar10/cifar10_input.py で見ることができ、このチュートリアル で説明されています。

標準 TensorFlow フォーマット

別のアプローチとしては貴方がどのようなデータを持とうがサポートされているフォーマットに変換することです。このアプローチはデータセットとネットワーク・アーキテクチャを混ぜ合わせることをより簡単にします。TensorFlow のための推奨フォーマットは(Features をフィールドとして含む) tf.train.Example protocol buffers を含む TFRecords ファイルです。

データを取得し、それを Examples protocol buffer に詰め込み、protocol buffer を文字列にシリアライズし、そして tf.python_io.TFRecordWriter クラス を使って文字列を TFRecords ファイルに書くような小さなプログラムを貴方は書くことができます。例えば、tensorflow/examples/how_tos/reading_data/convert_to_records.py は MNIST データをこのフォーマットに変換します。

TFRecords のファイルを読むには、tf.parse_single_example デコーダとともに tf.TFRecordReader を使用します。parse_single_example op は example protocol buffers をテンソルにデコードします。convert_to_records により生成されたデータを使用した MNIST サンプルは tensorflow/examples/how_tos/reading_data/fully_connected_reader.py で見つかります、これを fully_connected_feed 版と比較してみてください。

前処理

これでこれらのサンプルのどのような前処理でも行なうことができます。訓練可能なパラメータに依存しない前処理になります。サンプルはデータの正規化を含み、ランダムなスライスを選び取り、ノイズや歪みを追加します、等々。例としては tensorflow_models/tutorials/image/cifar10/cifar10_input.py を見てください。

バッチ処理 (Batching)

パイプラインの最後で、訓練、評価、あるいは推論のためのサンプルを一緒にバッチするためにもう一つ別のキューを使用します。このために tf.train.shuffle_batch 関数 を使ってサンプルの順序をランダム化するキューを使用します。

例:

def read_my_file_format(filename_queue):
  reader = tf.SomeReader()
  key, record_string = reader.read(filename_queue)
  example, label = tf.some_decoder(record_string)
  processed_example = some_processing(example)
  return processed_example, label

def input_pipeline(filenames, batch_size, num_epochs=None):
  filename_queue = tf.train.string_input_producer(
      filenames, num_epochs=num_epochs, shuffle=True)
  example, label = read_my_file_format(filename_queue)
  # min_after_dequeue defines how big a buffer we will randomly sample
  #   from -- bigger means better shuffling but slower start up and more
  #   memory used.
  # capacity must be larger than min_after_dequeue and the amount larger
  #   determines the maximum we will prefetch.  Recommendation:
  #   min_after_dequeue + (num_threads + a small safety margin) * batch_size
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch(
      [example, label], batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch

ファイル間の更なる並列性やサンプルのシャッフルを必要とするならば、tf.train.shuffle_batch_join 関数を使用して複数の reader インスタンスを使用します。例えば :

def read_my_file_format(filename_queue):
  # Same as above

def input_pipeline(filenames, batch_size, read_threads, num_epochs=None):
  filename_queue = tf.train.string_input_producer(
      filenames, num_epochs=num_epochs, shuffle=True)
  example_list = [read_my_file_format(filename_queue)
                  for _ in range(read_threads)]
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch_join(
      example_list, batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch

貴方は依然として全ての reader で共有される単一のファイル名キューを使用しています。この方法は、エポックからの全てのファイルが開始されるまで、異なる reader が同じエポックから異なるファイルを使用することを確かなものにします。(それはまた通常はファイル名キューを満たすシングル・スレッドを持つには十分です。)

他の方法としては num_threads を 1 より大きくして tf.train.shuffle_batch 関数 を通して単一の reader を使用することです。これは一度に N ファイルの代わりに、同時に一つのファイルからそれに読ませることになります(しかし 1 スレッドよりも速いです)。

これは次の場合に重要になり得ます:

  • もし貴方が入力ファイルよりも多い読み込みスレッドを持つ場合、互いに近い同じファイルから同じサンプルを読む 2 つのスレッドが発生するリスクを回避するために。
  • あるいは N ファイルを並列に読む込むことが多すぎるディスク・シークを引き起こす場合。

どれだけの数のスレッドが必要ですか?tf.train.shuffle_batch* 関数は、それはサンプル・キューがどのくらい満杯かを示す、要約をグラフに追加します。もし十分な読み込みスレッドを持つ場合には、その要約はゼロの上にとどまるでしょう。貴方は TensorBoard を使用して訓練進捗としての要約を見る ことができます。

QueueRunner オブジェクトを使って事前読み込みをするスレッドを作成する

短縮版: 上でリストされた tf.train 関数の多くはグラフに tf.train.QueueRunner オブジェクトを追加します。これらは訓練か推論ステップを実行する前に tf.train.start_queue_runners を呼び出すことが要求されます。そうでないと永久にハングします。これはサンプル・キューを満たす、入力パイプラインを実行するスレッドを開始します。結果としてサンプルを取得する dequeue が成功します。これは、エラーがある時にクリーンにこれらのスレッドを停止するためには tf.train.Coordinator とのベストの組み合わせです。エポック数の制限を設定する場合、それは、初期化される必要があるエポック・カウンターを使用します。これらを組み合わせた推奨のコードパターンは :

# Create the graph, etc.
init_op = tf.global_variables_initializer()

# Create a session for running operations in the Graph.
sess = tf.Session()

# Initialize the variables (like the epoch counter).
sess.run(init_op)

# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)

try:
    while not coord.should_stop():
        # Run training steps or whatever
        sess.run(train_op)

except tf.errors.OutOfRangeError:
    print('Done training -- epoch limit reached')
finally:
    # When done, ask the threads to stop.
    coord.request_stop()

# Wait for threads to finish.
coord.join(threads)
sess.close()

閑話休題: ここで何が起きているか?

まずグラフを作成します。それはキューにより結合される2、3のパイプライン・ステージを持ちます。最初のステージは読み込むためのファイル名を生成してそれらをファイル名キューに enqueue します。2つ目のステージは(Reader を使って)ファイル名を消費し、サンプルを生成し、そしてそれらをサンプル・キューに enqueue します。どのようにセットアップしたかに依存して、実際には第2ステージの2、3の独立したコピーを持つかもしれません、その結果として並列的に複数のファイルから読み込むことができます。これらのステージの最後に enqueue 演算があります、これは次のステージがそこから dequeue するキューに enqueue します。私たちはこれらの enqueue 演算を実行するスレッドを開始することを望みます、その結果、訓練ループはサンプル・キューからサンプルを dequeue することができます。

 

これらのキューと enqueue する演算を作成する tf.train のヘルパーは tf.train.add_queue_runner 関数を使って tf.train.QueueRunner をグラフに追加します。各 QueueRunner は一つのステージについて責任を負い、そしてスレッドで実行される必要がある enqueue 演算のリストを保持します。ひとたびグラフが構築されたのであれば、tf.train.start_queue_runners 関数はグラフの各 QueueRunner に enqueue する演算を実行するスレッドを開始することを依頼します。

全てが上手く行けば、訓練ステップを実行できてキューはバックグラウンドのスレッドで満たされます。エポック制限を設定した場合、どこかのポイントでサンプルを dequeue する試みは tf.OutOfRangeError を得るでしょう。これは TensorFlow の “end of file” (EOF) と同義です — これはエポック制限に達してそれ以上のサンプルが利用可能でないことを意味します。

最後の構成要素は Coordinator です。これは何かがシャットダウンのシグナルを生成した時に全てのスレッドに知らせる責任を持ちます。一般的にはこれは例外があがることによります。例えばスレッドの一つが何某かの演算の実行中にエラーを得た場合です(あるいは通常の Python 例外)。

スレッディング、キュー、QueueRunners そして Coordinators についての詳細は こちら を見てください。

閑話休題: エポック制限の動作時にどのようにきれいにシャットダウンするか

(その上で)訓練するエポック数の制限を設けたモデルを想像してください。ファイル名を生成するスレッドは OutOfRange エラーを生成する前にそれだけの数だけ実行することを意味しています。QueueRunner はそのエラーを捕捉して、ファイル名キューを閉じて、そしてスレッドを抜けます。

キューを閉じることは2つのことをします :

  • ファイル名キューへの enqueue の任意の未来の試行はエラーを生成します。この時点でそれを行なおうとする任意のスレッドはあるべきではありませんが、他のエラーによりキューが閉じられた時には助けになります。
  • 任意の現在または未来の dequeue は(十分な要素が残されている場合)成功するか直ちに(OutOfRange エラーで)失敗するかです。それらは enqueue されるそれ以上の要素を待つことをブロックしません。何故なら以前のポイントからそれは起きないからです。

ポイントはファイル名キューが閉じた時、そのキューには依然として多くのファイル名がありがちなことです。そのため(reader と他の前処理とともに)パイプラインの次のステージはある程度の時間動作し続けます。ひとたびファイル名キューが使い尽くされたならば、しかしながら、ファイル名の dequeue への次の試み(例えば、作業していたファイルの処理を終了させた reader から)は OutOfRange エラーのトリガーになります。しかしこの場合、単一の QueueRunner に関連した複数のスレッドを持つかもしれません。もしこれが QueueRunner の最後のスレッドでないならば、OutOfRange エラーは一つのスレッドを exit させるだけです。これは、最後のファイルを依然として終わらせようとしている、他のスレッドに終了するまで続行することを許可します(tf.train.Coordinator を使用していることを仮定しています、他のタイプのエラーは全てのスレッドを停止させます)。ひとたび全ての reader スレッドが OutOfRange エラーに当たれば、その時に限り次のキュー、サンプル・キューが、閉じられます。

もう一度、サンプルキューはキューされた幾つかの要素を持つでしょう、そして訓練はそれらが使い尽くされるまで続きます。もしサンプル・キューが RandomShuffleQueue であるならば、shuffle_batch か shuffle_batch_join を使用しているので、通常は要素がバッファしたその min_after_dequeue attr よりも少なくなることは回避します。けれども、ひとたびキューが閉じられれば、その制限は撤廃されてキューはイベント的に空になります。その時点で実際の訓練スレッドは、サンプルキューから dequeue しようとする時、OutOfRange エラーを取得し始めて exit します。ひとたび訓練スレッドが終了すれば、tf.train.Coordinator.join は return してクリーンに exit できます。

レコードをフィルタリングする、またはレコード毎に複数サンプルを生成する

形状 [x, y, z] のサンプルの代わりに、形状 [batch, x, y, z] のサンプルのバッチを生成します。batch サイズは、このレコードをフィルタしたいならば(多分それはホールドアウト・セットにある?) 0 を取ることができて、あるいはレコード毎に複数のサンプルを生成したいのであれば 1 より大きくなります。そして(shuffle_batch あるいは shuffle_batch_join のような) batching 関数の一つを呼び出す時に、単純に enqueue_many=True を設定します。

疎な入力データ

SparseTensors はキューとは上手くやれません。もし SparseTensors を使用するのであれば、(batching の前に tf.parse_single_example を使用する代わりに)batching の後、文字列レコードを tf.parse_example を使用してデコードしなければなりません。

 

事前ロードされたデータ

これは、全体をメモリにロード可能な小さなデータセットのためだけに使用されます。

2つのアプローチがあります :

  • データを定数にストアする。
  • データを変数にストアする、それは初期化してからは決して変更しません。

定数の使用が少しばかり単純ですが、よりメモリを使います。(何故なら定数はグラフのデータ構造においてインラインにストアされるからです、これは何回かデュプリケートされるでしょう)。

training_data = ...
training_labels = ...
with tf.Session():
  input_data = tf.constant(training_data)
  input_labels = tf.constant(training_labels)
  ...

代わりに変数を使う場合、グラフが組み立てられた後にまた初期化する必要があります。

training_data = ...
training_labels = ...
with tf.Session() as sess:
  data_initializer = tf.placeholder(dtype=training_data.dtype,
                                    shape=training_data.shape)
  label_initializer = tf.placeholder(dtype=training_labels.dtype,
                                     shape=training_labels.shape)
  input_data = tf.Variable(data_initializer, trainable=False, collections=[])
  input_labels = tf.Variable(label_initializer, trainable=False, collections=[])
  ...
  sess.run(input_data.initializer,
           feed_dict={data_initializer: training_data})
  sess.run(input_labels.initializer,
           feed_dict={label_initializer: training_labels})

trainable=False の設定はグラフにおいて変数を GraphKeys.TRAINABLE_VARIABLES コレクションの外で保持しますので、訓練時にそれを更新しようとはしません。collections=[] の設定は変数を GraphKeys.VARIABLES コレクションの外で保持し、チェックポイントを保存し復旧するために使用されます。

どちらにしても、tf.train.slice_input_producer 関数 は一度にスライスを生成するために使用できます。これはエポック全体に渡ってサンプルをシャッフルし、batching が望ましくない時には更なるシャッフリングをします。そこで shuffle_batch 関数を使用する代わりに平易な tf.train.batch 関数を使用します。複数の前処理スレッドを使うためには、num_threads パラメータを 1 よりも大きな数字に設定します。

定数を使用してデータを事前ロードする MNIST サンプルは tensorflow/examples/how_tos/reading_data/fully_connected_preloaded.py で見つかります、そして変数を使用してデータを事前ロードするものは tensorflow/examples/how_tos/reading_data/fully_connected_preloaded_var.py で見つかります、貴方はこれらを上の fully_connected_feed と fully_connected_reader 版と比較することができます。

 

複数の入力パイプライン

一般的に一つのデータセット上で訓練して他で評価 (or “eval”) することを望むでしょう。これを行なう一つの方法は実際に2つの分かれたプロセスを持ちます :

  • 訓練プロセスは訓練入力データを読みそして定期的に訓練された全ての変数をチェックポイント・ファイルに書き出します。
  • 評価プロセスはチェックポイントファイルを、検証入力データを読み込む推論モデルに復旧します。

これは サンプル CIFAR-10 モデル で行なわれているものです。これは2つの利益があります:

  • eval は訓練された変数の単一のスナップショットで遂行されます。
  • 訓練が完了して exit した後でさえも eval が遂行できます。

同じプロセスの同じグラフで訓練と評価を持ち、それらの訓練された変数を共有することができます。チュートリアル: 変数を共有する を見てください。

 

以上

AI導入支援 #2 ウェビナー

スモールスタートを可能としたAI導入支援   Vol.2
[無料 WEB セミナー] [詳細]
「画像認識 AI PoC スターターパック」の紹介
既に AI 技術を実ビジネスで活用し、成果を上げている日本企業も多く存在しており、競争優位なビジネスを展開しております。
しかしながら AI を導入したくとも PoC (概念実証) だけでも高額な費用がかかり取組めていない企業も少なくないようです。A I導入時には欠かせない PoC を手軽にしかも短期間で認知度を確認可能とするサービの紹介と共に、AI 技術の特性と具体的な導入プロセスに加え運用時のポイントについても解説いたします。
日時:2021年10月13日(水)
会場:WEBセミナー
共催:クラスキャット、日本FLOW(株)
後援:働き方改革推進コンソーシアム
参加費: 無料 (事前登録制)
人工知能開発支援
◆ クラスキャットは 人工知能研究開発支援 サービスを提供しています :
  • テクニカルコンサルティングサービス
  • 実証実験 (プロトタイプ構築)
  • アプリケーションへの実装
  • 人工知能研修サービス
◆ お問合せ先 ◆
(株)クラスキャット
セールス・インフォメーション
E-Mail:sales-info@classcat.com