TensorFlow : Programmer’s Guide : スレッディングとキュー (翻訳/解説)
翻訳 : (株)クラスキャット セールスインフォメーション
更新日時 : 09/10/2017; 07/26/2016
作成日時 : 03/03/2016
* 本ページは、TensorFlow 本家サイトの Programmer’s Guide – Threading and Queues を翻訳した上で
適宜、補足説明したものです:
* (obsolete) 本ページは、TensorFlow 本家サイトの How To – Threading and Queues を翻訳した上で
適宜、補足説明したものです:
- https://www.tensorflow.org/versions/master/how_tos/threading_and_queues/index.html#threading-and-queues
* サンプルコードの動作確認はしておりますが、適宜、追加改変しています。
* ご自由にリンクを張って頂いてかまいませんが、sales-info@classcat.com までご一報いただけると嬉しいです。
キューは TensorFlow を使った非同期計算のための力強い機構です。TensorFlow において全てがそうであるように、キューは TensorFlow グラフのノードです。それは変数のように、ステートフル(状態を保持する)なノードです : 他のノードはその中身を修正できます。特に、ノードは新しい項目をキューに入れたり (enqueue)、既存の項目をキューから取り出したり (dequeue) することができます。
キューのための感覚を掴むために、簡単な例を考えましょう。"ファーストイン、ファーストアウト" キュー (FIFOQueue
) を作成してゼロで満たしましょう。それからグラフを構築します。それはキューから項目を取り出し、その項目に 1 を加え、そしてキューの最後に返します。ゆっくりと、キュー上の数は増加していきます。

Enqueue、EnqueueMany そして Dequeue は特殊なノードです。これらは通常の値の代わりにキューへのポインタを取り、それを変更することが可能です。これらをキューのメソッドのように考えることを勧めます。実際に、Python API では、それらはキュー・オブジェクトのメソッドです (eg. q.enqueue(…))。キューのための感覚が少し持てたところで、詳細に突入しましょう…
キュー利用方法の概要
FIFOQueue と RandomShuffleQueue のような Queue はグラフでテンソルを非同期に計算するために重要な TensorFlow オブジェクトです。
例えば、典型的な入力アーキテクチャは RandomShuffleQueue をモデルを訓練するための入力を準備するために使用します :
- 複数スレッドが訓練サンプルを準備してそれらをキューにプッシュします。
- 訓練スレッドが、キューからミニ・バッチを dequeue する訓練 op を実行します。
それはキューからミニ・バッチを dequeue します。
このアーキテクチャは、How To: データを読む でハイライトされたように、多くの利益があります。それはまた、入力パイプラインの構築を単純化する、関数の概要を与えます。
TensorFlow セッション・オブジェクトはマルチ・スレッド化されていますので、複数スレッドが容易に同じセッションを使用して並列的に op を実行できます。けれども、上述のようなスレッドを駆使する Python プログラムを実装することは必ずしも簡単ではありません。全てのスレッドは一緒に停止することができなければなりませんし、例外は補足され報告されなければなりませんし、そしてキューは停止時に正しく閉じられなければなりません。
TensorFlow は手助けするために2つのクラスを提供しています : tf.Coordinator と tf.QueueRunner です。これら2つのクラスは一緒に使用されるように設計されています。Coordinator クラスは複数スレッドが一緒に停止してそれらの停止を待つプログラムへの例外を報告することを手助けします。QueueRunner クラスは、同じキューにテンソルを enqueue することを協力する、多くのスレッドを作成するために使用されます。
Coordinator
Coordinator クラスは複数のスレッドが一緒に停止する手助けをします。
そのキーとなるメソッドは :
- should_stop(): スレッドが停止すべき場合に True を返します。
- request_stop(<exception>): スレッドが停止するようにリクエストします。
- join(<list of threads>): 指定したスレッドが停止するまで待ちます。
最初に Coordinator オブジェクトを作成して、そして coordinator を使用する幾つかのスレッドを作成します。スレッドは典型的には、should_stop() が True を返すときに停止する、ループを実行します。
任意のスレッドは計算が停止すべきかを決定できます。それには request_stop() を呼び出さなければならいないだけで他のスレッドは should_stop() が True を返す時に停止します。
# スレッド・ボディ: coordinator が stop がリクエストされたことを示すまでループします。 # ある条件が true になれば、coordinator に停止を要求します。 def MyLoop(coord): while not coord.should_stop(): ...do something... if ...some condition...: coord.request_stop() # メイン・コード: coordinator を作成します。 coord = Coordinator() # 'MyLoop()' を実行する 10 スレッドを作成します。 threads = [threading.Thread(target=MyLoop, args=(coord,)) for i in xrange(10)] # スレッドを開始してそれら全てが停止するのを待ちます。 for t in threads: t.start() coord.join(threads)
明らかに、coordinator はスレッドが非常に異なることをするのを管理できます。それらは上の例のように全て同じである必要はありません。coordinator はまた例外を捕捉して報告するためのサポートを持ちます。より詳細は Coordinator クラス 文書を見てください。
QueueRunner
QueueRunner クラスは enqueue op を繰り返し実行するスレッドを幾つか作成します。
これらのスレッドは一緒に停止するために coordinator を使用できます。加えて、queue runner はクローザ (closer)・スレッドを実行します、これは例外が coordinator に報告された場合に自動的にキューを閉じます。
上で説明されたアーキテクチャの実装のために queue runner を使うことができます。
まず入力サンプルのための Queue を使用するグラフを組み立てます。サンプルを処理してそれらをキューに enqueue する ops を追加します。キューから dequeue することから開始する訓練 ops を追加します。
example = ...ops to create one example... # キューを作成して、サンプルを queue に一度に一つ enqueue します。 queue = tf.RandomShuffleQueue(...) enqueue_op = queue.enqueue(example) # サンプルのバッチを dequeue して開始される訓練グラフを作成します。 inputs = queue.dequeue_many(batch_size) train_op = ...use 'inputs' to build the training part of the graph...
Python 訓練プログラムでは、QueueRunner を作成します、これはサンプルを処理して enqueue するための2、3のスレッドを実行します。Coordinator を作成して queue runner にそのスレッドを coordinator とともに開始することを要求します。coordinator をまた使用する訓練ループを書きます。
# queue runner を作成します、これは サンプルを enqueue するために 4 スレッドを並列的に実行します。 qr = tf.train.QueueRunner(queue, [enqueue_op] * 4) # グラフを launch します。 sess = tf.Session() # coordinator を作成して、queue runner スレッドを launch します。 coord = tf.train.Coordinator() enqueue_threads = qr.create_threads(sess, coord=coord, start=True) # 訓練ループを実行します、coordinator で終了を制御します。 for step in xrange(1000000): if coord.should_stop(): break sess.run(train_op) # 終了した時、スレッドに停止を要求します。 coord.request_stop() # そしてそれらが実際にそれを行なうのを待ちます。 coord.join(threads)
例外を処理する
queue runner で開始されたスレッドは enqueue ops を実行するだけ以上のことをします。それらは queue により生成された、(queue が閉じられたことを報告するために使用される)OutOfRangeError を含む例外を捕捉して処理します。
coordinator を使用する訓練プログラムはメイン・ループで同様に例外を捕捉して報告しなければなりあせん。
これは上の訓練ループの改良版です。
try: for step in xrange(1000000): if coord.should_stop(): break sess.run(train_op) except Exception, e: # coordinator に例外を報告します。 coord.request_stop(e) # 通常のように終了します。停止を二度要求しても無害です。 coord.request_stop() coord.join(threads)
以上