Ujjwal
Ujjwal

Reputation: 1859

Enqueuing a tf.RandomShuffleQueue from multiple processes using multiprocessing

I would like to use multiple processes (not threads) to do some preprocessing and enqueue the results to a tf.RandomShuffleQueue which can be used by my main graph for training.

Is there a way to do that ?

My actual problem

I have converted my dataset into TFRecords split across 256 shards. I want to start 20 processes using multiprocessing and let each process a range of shards. Each process should read images and then augment them and push them into a tf.RandomShuffleQueue from which the input can be given to a graph for training.

Some people advised me to go through the inception example in tensorflow. However, it is a very different situation because there only reading of the data shards is done by multiple threads (not processes), while the preprocessing (e.g - augmentation) takes place in the main thread.

Upvotes: 5

Views: 694

Answers (3)

MWB
MWB

Reputation: 12577

(This aims to solve your actual problem)

In another topic, someone told you that Python has the global interpreter lock (GIL) and therefore there would be no speed benefits from multi-core, unless you used multiple processes.

This was probably what prompted your desire to use multiprocessing.

However, with TF, Python is normally used only to construct the "graph". The actual execution happens in native code (or GPU), where GIL plays no role whatsoever.

In light of this, I recommend simply letting TF use multithreading. This can be controlled using the intra_op_parallelism_threads argument, such as:

with tf.Session(graph=graph, 
    config=tf.ConfigProto(allow_soft_placement=True, 
    intra_op_parallelism_threads=20)) as sess:
    # ...

(Side note: if you have, say, a 2-CPU, 32-core system, the best argument may very well be intra_op_parallelism_threads=16, depending on a lot of factors)

Upvotes: 1

noxdafox
noxdafox

Reputation: 15050

It seems the recommended way to run TF with multiprocessing is via creating a separate tf.Session for each child as sharing it across processes is unfeasible.

You can take a look at this example, I hope it helps.

[EDIT: Old answer]

You can use a multiprocessing.Pool and rely on its callback mechanism to put results in the tf.RandomShuffleQueue as soon as they are ready.

Here's a very simple example on how to do it.

from multiprocessing import Pool


class Processor(object):
    def __init__(self, random_shuffle_queue):
        self.queue = random_shuffle_queue
        self.pool = Pool()

    def schedule_task(self, task):
        self.pool.apply_async(processing_function, args=[task], callback=self.task_done)

    def task_done(self, results):
        self.queue.enqueue(results)

This assumes Python 2, for Python 3 I'd recommend to use a concurrent.futures.ProcessPoolExecutor.

Upvotes: 0

stovfl
stovfl

Reputation: 15533

Comment: The pickling of TFRecords is not that important. I can pass a list of lists containing names of ranges of sharded TFRecord files.

Therebe I have to restart Decision process!

Comment: I can pass it to a Pool.map() as an argument.

Verify, if a multiprocesing.Queue() can handle this.
Results of Tensor functions are a Tensor object.
Try the following:

tensor_object = func(TFRecord)
q = multiprocessing.Manager().Queue()
q.put(tensor_object)
data = q.get()
print(data)

Comment: how do I make sure that all the processes enqueue to the same queue ?

This is simple done enqueue the results from Pool.map(... after all process finished.
Alternate we can enqueue parallel, queueing data from all processes.

But doing so, depends on pickleabel data as described above.


For instance:

import multiprocessing as mp
def func(filename):
    TFRecord = read(filename)
    tensor_obj = tf.func(TFRecord)
    return tensor_obj

def main_Tensor(tensor_objs):
    tf = # ... instantiat Tensor Session
    rsq = tf.RandomShuffleQueue(...)
    for t in tensor_objs:
        rsq.enqueue(t)

if __name__ == '__main__':
    sharded_TFRecords = ['file1', 'file2']
    with mp.Pool(20) as pool:
        tensor_objs = pool.map(func, sharded_TFRecords)
        pool.join()

    main_Tensor(tensor_objs)

Upvotes: 1

Related Questions