ericyue
ericyue

Reputation: 656

TFRecordReader seems extremely slow , and multi-threads reading not working

My training process use tfrecord format for train&eval dataset.

I test the benchmark of reader , only 8000records/second. and io speed(see from iotop command) just 400KB-500KB/s.

I'm using the cpp version of protobuf here

https://github.com/tensorflow/tensorflow/blob/master/tensorflow/g3doc/get_started/os_setup.md#protobuf-library-related-issues

If possible, provide a minimal reproducible example (We usually don't have time to read hundreds of lines of your code)

def read_and_decode(filename_queue):
     reader = tf.TFRecordReader()
    _, serialized_example = reader.read(filename_queue)
    return serialized_example
  serialized_example = read_and_decode(filename_queue)
  batch_serialized_example = tf.train.shuffle_batch(
      [serialized_example],
      batch_size=batch_size,
      num_threads=thread_number,
      capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  features = tf.parse_example(
      batch_serialized_example,
      features={
          "label": tf.FixedLenFeature([], tf.float32),
          "ids": tf.VarLenFeature(tf.int64),
          "values": tf.VarLenFeature(tf.float32),
      })

What other attempted solutions have you tried?

I try to set num_threads in tf.train.shuffle_batch but not working.

It seems that when set to 2 threads, it work at 8000records/s, when enlarge the thread number, it get slower. (I remove all ops that cost cpus. Just read data.)

My sever are 24 core cpus.

Upvotes: 9

Views: 3305

Answers (3)

Aderlar
Aderlar

Reputation: 11

An addendum to Yaroslav's answer: You can use tf.python_io.tf_record_iterator to iterate through the examples in order append them to a list which you can pass to tf.train.shuffle_batch with enqueue_many=true:

queue_batch = []
for serialized_example in tf.python_io.tf_record_iterator(filename,options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB)):
    queue_batch.append(serialized_example)
batch_serialized_example = tf.train.shuffle_batch(
    [queue_batch],
    batch_size=batch_size,
    num_threads=thread_number,
    capacity=capacity,
    min_after_dequeue=min_after_dequeue,
    enqueue_many=True)

It seems that trying to iterate through examples by using reader.read() will result in one read per batch. i.e. the nth batch will be batch_num copies of the nth record rather than batch_num many unique records.

Upvotes: 1

Erik Shilts
Erik Shilts

Reputation: 4509

Here's a simple speedup building on Yaroslav's answer:

Tensorflow has a built in function, tf.TFRecordReader.read_up_to, that reads multiple records in each session.run() call, thereby removing the excess overhead caused by multiple calls.

enqueue_many_size = SOME_ENQUEUE_MANY_SIZE
reader = tf.TFRecordReader(options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB))
_, queue_batch = reader.read_up_to(filename_queue, enqueue_many_size)
batch_serialized_example = tf.train.shuffle_batch(
    [queue_batch],
    batch_size=batch_size,
    num_threads=thread_number,
    capacity=capacity,
    min_after_dequeue=min_after_dequeue,
    enqueue_many=True)

As with Yaroslav's answer, you need to set enqueue_many=True so that the batch function knows it is accepting multiple records.

This was very fast in my use case.

Upvotes: 5

Yaroslav Bulatov
Yaroslav Bulatov

Reputation: 57923

The issue here is that there's a fixed cost overhead to each session.run, and filling the queue with many tiny examples to the queue will be slow.

In particular, each session.run is about 100-200 usec, so you can only do about 5k-10k session.run calls per second.

This problem is obvious if doing Python profiling (python -m cProfile), but hard to see if starting from timeline profile, or CPU profile.

The work-around is to use enqueue_many to add things to your queue in batches. I took your benchmark from https://gist.github.com/ericyue/7705407a88e643f7ab380c6658f641e8 and modified it to enqueue many items per .run call, and that gives 10x speed-up.

The modification is to modify tf.batch call as follows:

if enqueue_many:
    reader = tf.TFRecordReader(options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB))
    queue_batch = []
    for i in range(enqueue_many_size):
        _, serialized_example = reader.read(filename_queue)
        queue_batch.append(serialized_example)
    batch_serialized_example = tf.train.shuffle_batch(
        [queue_batch],
        batch_size=batch_size,
        num_threads=thread_number,
        capacity=capacity,
        min_after_dequeue=min_after_dequeue,
        enqueue_many=True)

For complete source, check here: https://github.com/yaroslavvb/stuff/blob/master/ericyue-slowreader/benchmark.py

It's hard to optimize it to go much faster since now most of the time is spent in queue operations. Looking at stripped down version which just adds integers to a queue, you also get similar speed, and looking at timeline, time is spent in dequeue ops.

enter image description here

Each dequeue op takes about 60 usec, but there's on average 5 runnning in parallel, so you get 12 usec per dequeue. So that means you'll get <200k examples per second in the best case.

Upvotes: 8

Related Questions