Reputation: 5551
This Cifar10 model, altered somewhat from the Cifar10 tutorial, seems to spike and then hang, never returning control of the program to the interpreter. It has been suggesting that it is on account of having not called .start_queue_runners
. However, there doesn't seem to be any code in the original Cifar10 tutorial that calls .start_queue_runners
, and it runs just fine. (I'm using the single-GPU training code cifar10_train.py
... the multiple-GPU code DOES call that method)
import os
import tensorflow as tf
import functools
from urllib.request import urlretrieve
import zipfile
import tarfile
import sys
# Process images of this size. Note that this differs from the original CIFAR
# image size of 32 x 32. If one alters this number, then the entire model
# architecture will change and any model would need to be retrained.
IMAGE_SIZE = 24
# Global constants describing the CIFAR-10 data set.
NUM_CLASSES = 10
NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 50000
NUM_EXAMPLES_PER_EPOCH_FOR_EVAL = 10000
def maybe_download_and_extract():
main_directory = "../data/"
cifar_10_directory = main_directory+"cifar_10/"
if not os.path.exists(main_directory):
os.makedirs(main_directory)
if not os.path.exists(cifar_10_directory):
url = "http://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz"
filename = url.split('/')[-1]
file_path = os.path.join(main_directory, filename)
zip_cifar_10 = file_path
file_path, _ = urlretrieve(url=url, filename=file_path, reporthook=print_download_progress)
print()
print("Download finished. Extracting files.")
if file_path.endswith(".zip"):
zipfile.ZipFile(file=file_path, mode="r").extractall(main_directory)
elif file_path.endswith((".tar.gz", ".tgz")):
tarfile.open(name=file_path, mode="r:gz").extractall(main_directory)
print("Done.")
os.rename(main_directory+"cifar-10-batches-bin", cifar_10_directory)
os.remove(zip_cifar_10)
return (
[os.path.join(cifar_10_directory, 'data_batch_{}.bin'.format(i)) for i in range(1,6)],
[os.path.join(cifar_10_directory, 'test_batch.bin')]
)
class Cifar10Record(list):
# This class represents a Cifar10 thing that we read from the Cifar10 files.
# Note that it is a tensorflow-type-thing, so data *moves through* here, as we read it (like a placeholder, or whatever)
# So one record object is sufficient to feed a whole training pipeline
label_bytes = 1 # 2 for CIFAR-100
height = 32
width = 32
depth = 3
# Every record consists of a label followed by the image, with a
# fixed number of bytes for each.
image_bytes = height * width * depth
record_bytes = label_bytes + image_bytes
def __init__(self, filenames):
# This is going to create the tf graph that reads a record.
self.filename_queue = tf.train.string_input_producer(filenames)
#1st part of graph: a record reader, that will get filenames from the filename_queue
self.reader = tf.FixedLengthRecordReader(record_bytes=Cifar10Record.record_bytes)
#2nd part, the reader is reading files from the filename_queue
self.key, value = self.reader.read(self.filename_queue)
# Convert from a string to a vector of uint8 that is record_bytes long.
record = tf.decode_raw(value, tf.uint8)
# The first bytes represent the label, which we convert from uint8->int32.
label = tf.cast(
tf.strided_slice(record, [0], [Cifar10Record.label_bytes]), tf.int32)
label.set_shape([1])
self.label = tf.one_hot(label, 10, on_value=1.0, off_value=0.0)
# The remaining bytes after the label represent the image, which we reshape
# from [depth * height * width] to [depth, height, width].
depth_major = tf.reshape(
tf.strided_slice(record, [Cifar10Record.label_bytes],
[Cifar10Record.record_bytes]),
[Cifar10Record.depth, Cifar10Record.height, Cifar10Record.width])
# Convert from [depth, height, width] to [height, width, depth].
self.uint8image = tf.transpose(depth_major, [1, 2, 0])
self.float32image = tf.cast(self.uint8image, tf.float32)
#self.distorted_image = self.distorted_input_branch()
self.image = self.undistorted_input_branch()
def undistorted_input_branch(self):
"""Construct input for CIFAR evaluation using the Reader ops.
"""
height = IMAGE_SIZE
width = IMAGE_SIZE
# Image processing for evaluation.
# Crop the central [height, width] of the image.
resized_image = tf.image.resize_image_with_crop_or_pad(self.float32image, height, width)
# Subtract off the mean and divide by the variance of the pixels.
float_image = tf.image.per_image_standardization(resized_image)
# Set the shapes of tensors.
float_image.set_shape([height, width, 3])
return float_image
def batched_input(self, num_examples_per_epoch, batch_size, shuffle=True, distorted=False):
"""Construct a queued batch of images and labels.
Returns:
images: Images. 4D tensor of [batch_size, height, width, 3] size.
labels: Labels. 1D tensor of [batch_size] size.
"""
# Ensure that the random shuffling has good mixing properties.
min_fraction_of_examples_in_queue = 0.4
min_queue_examples = int(num_examples_per_epoch *
min_fraction_of_examples_in_queue)
image = self.distorted_image if distorted else self.image
label = self.label
print ('Filling queue with %d CIFAR images before starting to train. '
'This will take a few minutes.' % min_queue_examples)
# Create a queue that shuffles the examples, and then
# read 'batch_size' images + labels from the example queue.
NUM_PREPROCESS_THREADS = 16
print(batch_size)
images, label_batch = tf.train.shuffle_batch(
[image, label],
batch_size=batch_size,
num_threads=NUM_PREPROCESS_THREADS,
capacity=min_queue_examples + 3 * batch_size,
min_after_dequeue=min_queue_examples)
return images, tf.reshape(label_batch, [batch_size,10])
train_files, test_files = maybe_download_and_extract()
train_data = Cifar10Record(train_files).batched_input(NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN, 100)
test_data = Cifar10Record(test_files).batched_input(NUM_EXAMPLES_PER_EPOCH_FOR_EVAL, 100)
with tf.Session() as sess:
sess.run(train_data)
input("foo!")
Upvotes: 0
Views: 205
Reputation: 5551
Anyone interested in reading Cifar10 records in the "new way" with the Dataset API might benefit from this answer:
import os
import tensorflow as tf
import functools
from urllib.request import urlretrieve
import zipfile
import tarfile
import sys
# Process images of this size. Note that this differs from the original CIFAR
# image size of 32 x 32. If one alters this number, then the entire model
# architecture will change and any model would need to be retrained.
IMAGE_SIZE = 24
BYTE = 1
KB = 1024*BYTE
MB = 1024*KB
# Global constants describing the CIFAR-10 data set.
NUM_CLASSES = 10
NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 50000
NUM_EXAMPLES_PER_EPOCH_FOR_EVAL = 10000
def print_download_progress(count, block_size, total_size):
pct_complete = float(count * block_size) / total_size
msg = "\r- Download progress: {0:.1%}".format(pct_complete)
sys.stdout.write(msg)
sys.stdout.flush()
def maybe_download_and_extract():
main_directory = "../data/"
cifar_10_directory = main_directory+"cifar_10/"
if not os.path.exists(main_directory):
os.makedirs(main_directory)
if not os.path.exists(cifar_10_directory):
url = "http://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz"
filename = url.split('/')[-1]
file_path = os.path.join(main_directory, filename)
zip_cifar_10 = file_path
file_path, _ = urlretrieve(url=url, filename=file_path, reporthook=print_download_progress)
print()
print("Download finished. Extracting files.")
if file_path.endswith(".zip"):
zipfile.ZipFile(file=file_path, mode="r").extractall(main_directory)
elif file_path.endswith((".tar.gz", ".tgz")):
tarfile.open(name=file_path, mode="r:gz").extractall(main_directory)
print("Done.")
os.rename(main_directory+"cifar-10-batches-bin", cifar_10_directory)
os.remove(zip_cifar_10)
return (
[os.path.join(cifar_10_directory, 'data_batch_{}.bin'.format(i)) for i in range(1,6)],
[os.path.join(cifar_10_directory, 'test_batch.bin')]
)
class Cifar10Record(object):
# This class represents a Cifar10 thing that we read from the Cifar10 files.
# Note that it is a tensorflow-type-thing, so data *moves through* here, as we read it (like a placeholder, or whatever)
# So one record object is sufficient to feed a whole training pipeline
label_bytes = 1 # 2 for CIFAR-100
height = 32
width = 32
depth = 3
# Every record consists of a label followed by the image, with a
# fixed number of bytes for each.
image_bytes = height * width * depth
record_bytes = label_bytes + image_bytes
def __init__(self, filenames):
# This is going to create the tf graph that reads a record.
self.filename_queue = tf.train.string_input_producer(filenames)
self.dataset = tf.data.FixedLengthRecordDataset(filenames, Cifar10Record.record_bytes, buffer_size=1*MB)
self.dataset = self.dataset.map(Cifar10Record._parse_function)
self.dataset = self.dataset.shuffle(buffer_size=1*MB)
self.dataset = self.dataset.batch(100)
self.iterator = self.dataset.make_initializable_iterator()
self.next_input = self.iterator.get_next()
@staticmethod
def _parse_function(value):
record = tf.decode_raw(value, tf.uint8)
# The first bytes represent the label, which we convert from uint8->int32.
label = tf.cast(
tf.strided_slice(record, [0], [Cifar10Record.label_bytes]), tf.int32)
label.set_shape([1])
label = tf.one_hot(label, 10, on_value=1.0, off_value=0.0)
# The remaining bytes after the label represent the image, which we reshape
# from [depth * height * width] to [depth, height, width].
depth_major = tf.reshape(
tf.strided_slice(record, [Cifar10Record.label_bytes],
[Cifar10Record.record_bytes]),
[Cifar10Record.depth, Cifar10Record.height, Cifar10Record.width])
# Convert from [depth, height, width] to [height, width, depth].
uint8image = tf.transpose(depth_major, [1, 2, 0])
float32image = tf.cast(uint8image, tf.float32)
return float32image, label
train_files, test_files = maybe_download_and_extract()
train_data = Cifar10Record(train_files)
test_data = Cifar10Record(test_files)
with tf.Session() as sess:
sess.run(train_data.iterator.initializer)
print(sess.run(train_data.next_input))
Upvotes: 0
Reputation: 5551
According to a comment from @de1, the reasoning is that a MonitoredTrainingSession
calls start_queue_runners
or does something like it. Here is functioning code that starts the queue runners properly:
import os
import tensorflow as tf
import functools
from urllib.request import urlretrieve
import zipfile
import tarfile
import sys
# Process images of this size. Note that this differs from the original CIFAR
# image size of 32 x 32. If one alters this number, then the entire model
# architecture will change and any model would need to be retrained.
IMAGE_SIZE = 24
# Global constants describing the CIFAR-10 data set.
NUM_CLASSES = 10
NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 50000
NUM_EXAMPLES_PER_EPOCH_FOR_EVAL = 10000
def maybe_download_and_extract():
main_directory = "../data/"
cifar_10_directory = main_directory+"cifar_10/"
if not os.path.exists(main_directory):
os.makedirs(main_directory)
if not os.path.exists(cifar_10_directory):
url = "http://www.cs.toronto.edu/~kriz/cifar-10-binary.tar.gz"
filename = url.split('/')[-1]
file_path = os.path.join(main_directory, filename)
zip_cifar_10 = file_path
file_path, _ = urlretrieve(url=url, filename=file_path, reporthook=print_download_progress)
print()
print("Download finished. Extracting files.")
if file_path.endswith(".zip"):
zipfile.ZipFile(file=file_path, mode="r").extractall(main_directory)
elif file_path.endswith((".tar.gz", ".tgz")):
tarfile.open(name=file_path, mode="r:gz").extractall(main_directory)
print("Done.")
os.rename(main_directory+"cifar-10-batches-bin", cifar_10_directory)
os.remove(zip_cifar_10)
return (
[os.path.join(cifar_10_directory, 'data_batch_{}.bin'.format(i)) for i in range(1,6)],
[os.path.join(cifar_10_directory, 'test_batch.bin')]
)
class Cifar10Record(list):
# This class represents a Cifar10 thing that we read from the Cifar10 files.
# Note that it is a tensorflow-type-thing, so data *moves through* here, as we read it (like a placeholder, or whatever)
# So one record object is sufficient to feed a whole training pipeline
label_bytes = 1 # 2 for CIFAR-100
height = 32
width = 32
depth = 3
# Every record consists of a label followed by the image, with a
# fixed number of bytes for each.
image_bytes = height * width * depth
record_bytes = label_bytes + image_bytes
def __init__(self, filenames):
# This is going to create the tf graph that reads a record.
self.filename_queue = tf.train.string_input_producer(filenames)
#1st part of graph: a record reader, that will get filenames from the filename_queue
self.reader = tf.FixedLengthRecordReader(record_bytes=Cifar10Record.record_bytes)
#2nd part, the reader is reading files from the filename_queue
self.key, value = self.reader.read(self.filename_queue)
# Convert from a string to a vector of uint8 that is record_bytes long.
record = tf.decode_raw(value, tf.uint8)
# The first bytes represent the label, which we convert from uint8->int32.
label = tf.cast(
tf.strided_slice(record, [0], [Cifar10Record.label_bytes]), tf.int32)
label.set_shape([1])
self.label = tf.one_hot(label, 10, on_value=1.0, off_value=0.0)
# The remaining bytes after the label represent the image, which we reshape
# from [depth * height * width] to [depth, height, width].
depth_major = tf.reshape(
tf.strided_slice(record, [Cifar10Record.label_bytes],
[Cifar10Record.record_bytes]),
[Cifar10Record.depth, Cifar10Record.height, Cifar10Record.width])
# Convert from [depth, height, width] to [height, width, depth].
self.uint8image = tf.transpose(depth_major, [1, 2, 0])
self.float32image = tf.cast(self.uint8image, tf.float32)
#self.distorted_image = self.distorted_input_branch()
self.image = self.undistorted_input_branch()
def undistorted_input_branch(self):
"""Construct input for CIFAR evaluation using the Reader ops.
"""
height = IMAGE_SIZE
width = IMAGE_SIZE
# Image processing for evaluation.
# Crop the central [height, width] of the image.
resized_image = tf.image.resize_image_with_crop_or_pad(self.float32image, height, width)
# Subtract off the mean and divide by the variance of the pixels.
float_image = tf.image.per_image_standardization(resized_image)
# Set the shapes of tensors.
float_image.set_shape([height, width, 3])
return float_image
def batched_input(self, num_examples_per_epoch, batch_size, shuffle=True, distorted=False):
"""Construct a queued batch of images and labels.
Returns:
images: Images. 4D tensor of [batch_size, height, width, 3] size.
labels: Labels. 1D tensor of [batch_size] size.
"""
# Ensure that the random shuffling has good mixing properties.
min_fraction_of_examples_in_queue = 0.4
min_queue_examples = int(num_examples_per_epoch *
min_fraction_of_examples_in_queue)
image = self.distorted_image if distorted else self.image
label = self.label
print ('Filling queue with %d CIFAR images before starting to train. '
'This will take a few minutes.' % min_queue_examples)
# Create a queue that shuffles the examples, and then
# read 'batch_size' images + labels from the example queue.
NUM_PREPROCESS_THREADS = 16
print(batch_size)
images, label_batch = tf.train.shuffle_batch(
[image, label],
batch_size=batch_size,
num_threads=NUM_PREPROCESS_THREADS,
capacity=min_queue_examples + 3 * batch_size,
min_after_dequeue=min_queue_examples)
return images, tf.reshape(label_batch, [batch_size,10])
train_files, test_files = maybe_download_and_extract()
train_queue = Cifar10Record(train_files)
train_data = train_queue.batched_input(NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN, 100)
test_queue = Cifar10Record(test_files)
test_data = test_queue.batched_input(NUM_EXAMPLES_PER_EPOCH_FOR_EVAL, 100)
with tf.Session() as sess:
tf.train.start_queue_runners(sess=sess)
sess.run(train_data)
input("foo!")
Upvotes: 0