Reputation: 4143
I want to train a Tensorflow neural network on a large data-set of 100 millions rows with around 15000 features per row. Training on a single machine would likely be too slow, so I want to be running distributed.
All the distributed examples I've seen so far start with loading the whole data into memory and then sending to the slaves, which would be too expensive in my case.
Does anyone know how to set up have the slaves to stream in their training data? Currently the data is stored in Google cloud storage, but we can be flexible about this.
Upvotes: 5
Views: 3325
Reputation: 281
This can be done in two steps: 1) Index your input data properly based on task_id (or worker#) 2) Encapsulate the data retrieval in the tf.device scope (similar to inception_distributed_train.py:113)
(assuming you are using the appropriate feeders to open an iterator to your data)
Example: 1 parameter server, 4 workers, 1 input data table
Inside tf.device scope for each worker, open a handle to your data, and make sure that each worker reads a unique row. Therefore if you have n workers, make sure to advance your table iterators by n + task_id for each worker. In the example setup you will have 4 workers that start at the same base row number (0) + task_id. This means worker 0 (with task_id=0) will read row 0; worker 1 (with task_id=1) will read row 1... etc. As long as you advance the row iterator by the number of workers each will pull independently pull unique rows.
Upvotes: 0
Reputation: 4451
Although I never tried it with so many samples (just out of interest, what dataset are you training on?), I think you should use the Queuerunner objects!
They can be found on this page: https://www.tensorflow.org/programmers_guide/reading_data , in the section "Creating threads to prefetch using QueueRunner objects".
A quote on how they work:
The short version: many of the tf.train functions listed above add tf.train.QueueRunner objects to your graph. These require that you call tf.train.start_queue_runners before running any training or inference steps, or it will hang forever. This will start threads that run the input pipeline, filling the example queue so that the dequeue to get the examples will succeed. This is best combined with a tf.train.Coordinator to cleanly shut down these threads when there are errors.
The recommended code pattern from that page to combine this is:
# Create the graph, etc.
init_op = tf.global_variables_initializer()
# Create a session for running operations in the Graph.
sess = tf.Session()
# Initialize the variables (like the epoch counter).
sess.run(init_op)
# Start input enqueue threads.
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
try:
while not coord.should_stop():
# Run training steps or whatever
sess.run(train_op)
except tf.errors.OutOfRangeError:
print('Done training -- epoch limit reached')
finally:
# When done, ask the threads to stop.
coord.request_stop()
# Wait for threads to finish.
coord.join(threads)
sess.close()
Although Stackoverflow always likes complete explanations instead of links to relevant pages, there is way more information on the page I linked above!
Would love to know if this solved your problem, and good luck!
Upvotes: 1