Reputation: 39
Tried to write tfrecord w/ and w/o multithreading, and found the speed difference is not much (w/ 4 threads: 434 seconds; w/o multithread 590 seconds). Not sure if I used it correctly. Is there any better way to write tfrecord faster?
import tensorflow as tf
import numpy as np
import threading
import time
def generate_data(shape=[15,28,60,1]):
return np.random.uniform(size=shape)
def _bytes_feature(value):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def write_instances_to_tfrecord(tfrecord_file, filenames):
tfrecord_writer = tf.python_io.TFRecordWriter(tfrecord_file)
for i, filename in enumerate(filenames):
curr_MFCC = generate_data()
curr_MFCC_raw = curr_MFCC.tostring()
curr_filename_raw = str(filename)+'-'+str(i)
example = tf.train.Example(features=tf.train.Features(
feature={
'MFCC': _bytes_feature(curr_MFCC_raw),
'filename': _bytes_feature(curr_filename_raw)
})
)
tfrecord_writer.write(example.SerializeToString())
tfrecord_writer.close()
def test():
threading_start = time.time()
coord = tf.train.Coordinator()
threads = []
for thread_index in xrange(4):
args = (str(thread_index), range(200000))
t = threading.Thread(target=write_instances_to_tfrecord, args=args)
t.start()
threads.append(t)
coord.join(threads)
print 'w/ threading takes', time.time()-threading_start
start = time.time()
write_instances_to_tfrecord('5', range(800000))
print 'w/o threading takes', time.time()-start
if __name__ == '__main__':
test()
Upvotes: 4
Views: 2726
Reputation: 971
When using python threading, due to the GIL restriction in the cPython implementation, the CPU utilization will be capped at 1 core. No matter how many threads you add, you will not see a speed up.
A simple solution in your case would be to use the multiprocessing
module.
The code is almost exactly the same as what you have, just switch threads to processes:
from multiprocessing import Process
coord = tf.train.Coordinator()
processes = []
for thread_index in xrange(4):
args = (str(thread_index), range(200000))
p = Process(target=write_instances_to_tfrecord, args=args)
p.start()
processes.append(p)
coord.join(processes)
I tested this on my own tfrecord writer code, and got a linear scaling speedup. Total number of processes is limited by memory.
Upvotes: 3
Reputation: 39
It's better to use Tensorflow computation graph to take advantage of multithreading since each session and graph can be run in different threads. With computation graph, it's about 40 times faster.
Upvotes: -1