Reputation: 1859
I would like to use multiple processes (not threads
) to do some preprocessing and enqueue the results to a tf.RandomShuffleQueue which can be used by my main graph for training.
Is there a way to do that ?
I have converted my dataset into TFRecords split across 256 shards. I want to start 20 processes using multiprocessing
and let each process a range of shards. Each process should read images and then augment them and push them into a tf.RandomShuffleQueue
from which the input can be given to a graph for training.
Some people advised me to go through the inception
example in tensorflow
. However, it is a very different situation because there only reading of the data shards is done by multiple threads (not processes
), while the preprocessing (e.g - augmentation) takes place in the main thread.
Upvotes: 5
Views: 694
Reputation: 12577
(This aims to solve your actual problem)
In another topic, someone told you that Python has the global interpreter lock (GIL) and therefore there would be no speed benefits from multi-core, unless you used multiple processes.
This was probably what prompted your desire to use multiprocessing
.
However, with TF, Python is normally used only to construct the "graph". The actual execution happens in native code (or GPU), where GIL plays no role whatsoever.
In light of this, I recommend simply letting TF use multithreading. This can be controlled using the intra_op_parallelism_threads
argument, such as:
with tf.Session(graph=graph,
config=tf.ConfigProto(allow_soft_placement=True,
intra_op_parallelism_threads=20)) as sess:
# ...
(Side note: if you have, say, a 2-CPU, 32-core system, the best argument may very well be intra_op_parallelism_threads=16
, depending on a lot of factors)
Upvotes: 1
Reputation: 15050
It seems the recommended way to run TF
with multiprocessing
is via creating a separate tf.Session
for each child as sharing it across processes is unfeasible.
You can take a look at this example, I hope it helps.
[EDIT: Old answer]
You can use a multiprocessing.Pool
and rely on its callback mechanism to put results in the tf.RandomShuffleQueue
as soon as they are ready.
Here's a very simple example on how to do it.
from multiprocessing import Pool
class Processor(object):
def __init__(self, random_shuffle_queue):
self.queue = random_shuffle_queue
self.pool = Pool()
def schedule_task(self, task):
self.pool.apply_async(processing_function, args=[task], callback=self.task_done)
def task_done(self, results):
self.queue.enqueue(results)
This assumes Python 2, for Python 3 I'd recommend to use a concurrent.futures.ProcessPoolExecutor
.
Upvotes: 0
Reputation: 15533
Comment: The pickling of TFRecords is not that important. I can pass a list of lists containing names of ranges of sharded TFRecord files.
Therebe I have to restart Decision process!
Comment: I can pass it to a Pool.map() as an argument.
Verify, if a multiprocesing.Queue()
can handle this.
Results of Tensor functions are a Tensor object
.
Try the following:
tensor_object = func(TFRecord)
q = multiprocessing.Manager().Queue()
q.put(tensor_object)
data = q.get()
print(data)
Comment: how do I make sure that all the processes enqueue to the same queue ?
This is simple done enqueue
the results from Pool.map(...
after all process
finished.
Alternate we can enqueue
parallel, queueing
data from all processes
.
But doing so, depends on pickleabel data as described above.
For instance:
import multiprocessing as mp
def func(filename):
TFRecord = read(filename)
tensor_obj = tf.func(TFRecord)
return tensor_obj
def main_Tensor(tensor_objs):
tf = # ... instantiat Tensor Session
rsq = tf.RandomShuffleQueue(...)
for t in tensor_objs:
rsq.enqueue(t)
if __name__ == '__main__':
sharded_TFRecords = ['file1', 'file2']
with mp.Pool(20) as pool:
tensor_objs = pool.map(func, sharded_TFRecords)
pool.join()
main_Tensor(tensor_objs)
Upvotes: 1