tensorflow 程序挂起的原因,即整个进程不报错又不执行的原因

时间:2022-08-27 15:40:57

一、说明:在使用tensorflow的过程中,出现过程序不报错又不接下去执行的错误,后来分析了原因是tf的数据线程没有启动,导致数据流图没办法计算,整个程序就卡在哪里。

更深层次的原因是tensorflow的计算和数据读入是异步的,合理的方式是主线程进行模型的训练,然后开一个数据读入线程异步读入数据.tensorflow会在内存中维护一个队列,然后数据线程异步从磁盘中将样本推入队列当中。并且,因为tensorflow的训练和读数据是异步的,故即使当前没有数据进来,tensorflow也没办法报错,因为可能接下来会有数据进队列,所以,tensorflow就一直处于等待的状态

说明:我是在修改Tensorflow的源码ptb_word_lm.py的时候遇到上述的问题的。下面就该源码来解释说明这个问题:

tensorflow的reader.py文件:

"""Utilities for parsing PTB text files."""
#-*- coding:utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import collections
import os

import tensorflow as tf

#将文件中所有的word收集起来
def _read_words(filename):
  with tf.gfile.GFile(filename, "r") as f:
    return f.read().decode("utf-8").replace("\n", "<eos>").split()

#将收集到的word映射到id
def _build_vocab(filename):
  data = _read_words(filename)

  counter = collections.Counter(data)
  count_pairs = sorted(counter.items(), key=lambda x: (-x[1], x[0]))

  words, _ = list(zip(*count_pairs))
  word_to_id = dict(zip(words, range(len(words))))

  return word_to_id

#使用训练集的word建立word的映射表
#
def _file_to_word_ids(filename, word_to_id):
  data = _read_words(filename)
  return [word_to_id[word] for word in data if word in word_to_id]


def ptb_raw_data(data_path=None):
  """Load PTB raw data from data directory "data_path".

  Reads PTB text files, converts strings to integer ids,
  and performs mini-batching of the inputs.

  The PTB dataset comes from Tomas Mikolov's webpage:

  http://www.fit.vutbr.cz/~imikolov/rnnlm/simple-examples.tgz

  Args:
    data_path: string path to the directory where simple-examples.tgz has
      been extracted.

  Returns:
    tuple (train_data, valid_data, test_data, vocabulary)
    where each of the data objects can be passed to PTBIterator.
  """

  train_path = os.path.join(data_path, "ptb.train.txt")
  valid_path = os.path.join(data_path, "ptb.valid.txt")
  test_path = os.path.join(data_path, "ptb.test.txt")

  word_to_id = _build_vocab(train_path)
  train_data = _file_to_word_ids(train_path, word_to_id)
  valid_data = _file_to_word_ids(valid_path, word_to_id)
  test_data = _file_to_word_ids(test_path, word_to_id)
  vocabulary = len(word_to_id)
  return train_data, valid_data, test_data, vocabulary


def ptb_producer(raw_data, batch_size, num_steps, name=None):
  """Iterate on the raw PTB data.

  This chunks up raw_data into batches of examples and returns Tensors that
  are drawn from these batches.

  Args:
    raw_data: one of the raw data outputs from ptb_raw_data.
    batch_size: int, the batch size.
    num_steps: int, the number of unrolls.
    name: the name of this operation (optional).

  Returns:
    A pair of Tensors, each shaped [batch_size, num_steps]. The second element
    of the tuple is the same data time-shifted to the right by one.

  Raises:
    tf.errors.InvalidArgumentError: if batch_size or num_steps are too high.
  """
  with tf.name_scope(name, "PTBProducer", [raw_data, batch_size, num_steps]):
    raw_data = tf.convert_to_tensor(raw_data, name="raw_data", dtype=tf.int32)

    data_len = tf.size(raw_data)
    batch_len = data_len // batch_size
    data = tf.reshape(raw_data[0 : batch_size * batch_len],
                      [batch_size, batch_len])

    epoch_size = (batch_len - 1) // num_steps
    assertion = tf.assert_positive(
        epoch_size,
        message="epoch_size == 0, decrease batch_size or num_steps")
    with tf.control_dependencies([assertion]):
      epoch_size = tf.identity(epoch_size, name="epoch_size")

    i = tf.train.range_input_producer(epoch_size, shuffle=False).dequeue()
    x = tf.slice(data, [0, i * num_steps], [batch_size, num_steps])
    y = tf.slice(data, [0, i * num_steps + 1], [batch_size, num_steps])
    return x, y

说明:详解这个reader.py文件:

1、产生一个队列,里面的数是0到epoch_size-1.然后定义了一个出队操作,说明队列也是数据流图中的一个结点.使用了range_input_producer之后,会自动产生一个QueueRunner. QueueRunner for the Queue is added to the current Graph'sQUEUE_RUNNER collection.

i = tf.train.range_input_producer(epoch_size, shuffle=False).dequeue()
2、定义了切片操作,返回训练样本的x和y

    x = tf.slice(data, [0, i * num_steps], [batch_size, num_steps])
    y = tf.slice(data, [0, i * num_steps + 1], [batch_size, num_steps])
3、具体使用说明:

    在使用的过程中,只要每次迭代的时候,我们取一下x,y。那么,就会触发跟x,y相关联的操作,也即出队操作和切片操作,为我们生成数据.但是,通过队列的方式来读入数据都是一种多线程读入数据的方式,要在session当中将该线程开启,不然就会挂起。

二、分析错误的情况&相应的修改办法

1、错误的情况

#-*- coding:utf-8 -*-
import numpy as np
import tensorflow as tf

from tensorflow.models.rnn.ptb import reader

class PTBInput(object):
  """The input data."""
  def __init__(self, config, data, name=None):
    self.batch_size = batch_size = config.batch_size
    self.num_steps = num_steps = config.num_steps
    #为何要进行-1操作
    self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
    self.input_data, self.targets = reader.ptb_producer(
        data, batch_size, num_steps, name=name)

class SmallConfig(object):
  """Small config."""
  init_scale = 0.1
  learning_rate = 1.0
  max_grad_norm = 5
  num_layers = 2
  num_steps = 20
  hidden_size = 200
  max_epoch = 4
  max_max_epoch = 13
  keep_prob = 1.0
  lr_decay = 0.5
  batch_size = 20
  vocab_size = 10000

if __name__ == '__main__':
	config = SmallConfig()
        data_path = '/home/jdlu/jdluTensor/data/simple-examples/data'       
	raw_data = reader.ptb_raw_data(data_path)
	train_data, valid_data, test_data, _ = raw_data
	train_input = PTBInput(config=config, data=train_data, name="TrainInput")
        print "end--------------------------------"
        
	#wrong,使用session就会出现读不出数据的错误,读不出数据,整个数据流图就无法计算,整个程序就处于挂起的状态
	#使用session会出错
	with tf.Session() as sess:
		for step in range(1):
			print sess.run(train_input.input_data)	
说明:在Session当中,没有启动数据读入线程。所以,sess.run(train_input.input_data)就是无数据可取,程序就处于一种挂起的状态。

2、解决方案

#-*- coding:utf-8 -*-
import numpy as np
import tensorflow as tf

from tensorflow.models.rnn.ptb import reader

class PTBInput(object):
  """The input data."""
  def __init__(self, config, data, name=None):
    self.batch_size = batch_size = config.batch_size
    self.num_steps = num_steps = config.num_steps
    #为何要进行-1操作
    self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
    self.input_data, self.targets = reader.ptb_producer(
        data, batch_size, num_steps, name=name)

class SmallConfig(object):
  """Small config."""
  init_scale = 0.1
  learning_rate = 1.0
  max_grad_norm = 5
  num_layers = 2
  num_steps = 20
  hidden_size = 200
  max_epoch = 4
  max_max_epoch = 13
  keep_prob = 1.0
  lr_decay = 0.5
  batch_size = 20
  vocab_size = 10000

if __name__ == '__main__':
	config = SmallConfig()
        data_path = '/home/jdlu/jdluTensor/data/simple-examples/data'       
	raw_data = reader.ptb_raw_data(data_path)
	train_data, valid_data, test_data, _ = raw_data
	train_input = PTBInput(config=config, data=train_data, name="TrainInput")
        print "end--------------------------------"
        

	#right,使用Supervisor()
	#sv = tf.train.Supervisor()
        #with sv.managed_session() as sess:
	#	for step in range(1):
	#		print sess.run(train_input.input_data)	
        
	#right
	# Create a session for running operations in the Graph.
	sess = tf.Session()
	# Start input enqueue threads.
	coord = tf.train.Coordinator()
	threads = tf.train.start_queue_runners(sess=sess, coord=coord)
	# Run training steps or whatever
	try:
		for step in range(2):
			print sess.run(train_input.input_data)
	except Exception,e:
		#Report exceptions to the coordinator
		coord.request_stop(e)
	coord.request_stop()
	# Terminate as usual.  It is innocuous to request stop twice.
	coord.join(threads)
	sess.close()

说明:使用tf.train.range_input_producer(epoch_size, shuffle=False),会默认将 QueueRunner添加到全局图中,我们必须使用tf.train.start_queue_runners(sess=sess),去启动该线程。然后使用coord = tf.train.Coordinator()去做一些线程的同步工作。

3、解决方案:

#-*- coding:utf-8 -*-
import numpy as np
import tensorflow as tf

from tensorflow.models.rnn.ptb import reader

class PTBInput(object):
  """The input data."""
  def __init__(self, config, data, name=None):
    self.batch_size = batch_size = config.batch_size
    self.num_steps = num_steps = config.num_steps
    #为何要进行-1操作
    self.epoch_size = ((len(data) // batch_size) - 1) // num_steps
    self.input_data, self.targets = reader.ptb_producer(
        data, batch_size, num_steps, name=name)

class SmallConfig(object):
  """Small config."""
  init_scale = 0.1
  learning_rate = 1.0
  max_grad_norm = 5
  num_layers = 2
  num_steps = 20
  hidden_size = 200
  max_epoch = 4
  max_max_epoch = 13
  keep_prob = 1.0
  lr_decay = 0.5
  batch_size = 20
  vocab_size = 10000

if __name__ == '__main__':
	config = SmallConfig()
        data_path = '/home/jdlu/jdluTensor/data/simple-examples/data'       
	raw_data = reader.ptb_raw_data(data_path)
	train_data, valid_data, test_data, _ = raw_data
	train_input = PTBInput(config=config, data=train_data, name="TrainInput")
        print "end--------------------------------"
        

	#right,使用Supervisor()
	sv = tf.train.Supervisor()
        with sv.managed_session() as sess:
		for step in range(1):
			print sess.run(train_input.input_data)

说明:使用sv = tf.train.Supervisor()会比较方便,文档上说, The Supervisor is a small wrapper around a  Coordinator , a  Saver , and a  SessionManager

也即使用了Supervisor(),那么保存模型,线程同步的事情都不用我们去干涉了。