Thibaut Loiseleur
Thibaut Loiseleur

Reputation: 824

Train model using queue Tensorflow

I designed a neural network in tensorflow for my regression problem by following and adapting the tensorflow tutorial. However, due to the structure of my problem (~300.000 data points and use of the costful FTRLOptimizer), my problem took too long to execute even with my 32 CPUs machine (I don't have GPUs).

According to this comment and a quick confirmation via htop, it appears that I have some single-threaded operations and it should be feed_dict.

Therefore, as adviced here, I tried to use queues for multi-threading my program.

I wrote a simple code file with queue to train a model as following:

import numpy as np
import tensorflow as tf
import threading

#Function for enqueueing in parallel my data
def enqueue_thread():
    sess.run(enqueue_op, feed_dict={x_batch_enqueue: x, y_batch_enqueue: y})

#Set the number of couples (x, y) I use for "training" my model
BATCH_SIZE = 5

#Generate my data where y=x+1+little_noise
x = np.random.randn(10, 1).astype('float32')
y = x+1+np.random.randn(10, 1)/100

#Create the variables for my model y = x*W+b, then W and b should both converge to 1.
W = tf.get_variable('W', shape=[1, 1], dtype='float32')
b = tf.get_variable('b', shape=[1, 1], dtype='float32')

#Prepare the placeholdeers for enqueueing
x_batch_enqueue = tf.placeholder(tf.float32, shape=[None, 1])
y_batch_enqueue = tf.placeholder(tf.float32, shape=[None, 1])

#Create the queue
q = tf.RandomShuffleQueue(capacity=2**20, min_after_dequeue=BATCH_SIZE, dtypes=[tf.float32, tf.float32], seed=12, shapes=[[1], [1]])

#Enqueue operation
enqueue_op = q.enqueue_many([x_batch_enqueue, y_batch_enqueue])

#Dequeue operation
x_batch, y_batch = q.dequeue_many(BATCH_SIZE)

#Prediction with linear model + bias
y_pred=tf.add(tf.mul(x_batch, W), b)

#MAE cost function
cost = tf.reduce_mean(tf.abs(y_batch-y_pred))

learning_rate = 1e-3
train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost)
init = tf.initialize_all_variables()
sess = tf.Session()
sess.run(init)
available_threads = 1024

#Feed the queue
for i in range(available_threads):
    threading.Thread(target=enqueue_thread).start()

#Train the model
for step in range(1000):
    _, cost_step = sess.run([train_op, cost])
    print(cost_step)
Wf=sess.run(W)
bf=sess.run(b)

This code doesn't work because each time I call x_batch, one y_batch is also dequeued and vice versa. Then, I do not compare the features with the corresponding "result".

Is there an easy way to avoid this problem ?

Upvotes: 3

Views: 577

Answers (2)

Salvador Dali
Salvador Dali

Reputation: 222531

Nonetheless your problem is solved, wanted to show you a small inefficiency in your code. When you created your RandomShuffleQueue you specified capacity=2**20. In all the queues capacity:

The upper bound on the number of elements that may be stored in this queue.

The queue will try to put as many elements as possible in the queue till it will hit this limit. All these elements are eating your RAM. If each element consists of only 1byte, your queue will eat 1Mb of your data. If you will have 10Kb images in your queue you will eat 10Gb of RAM.

This is very wasteful, especially because you never need so many elements in the queue. All you need to make sure is that your queue is never empty. So find a reasonable capacity of the queue and do not use huge numbers.

Upvotes: 0

Thibaut Loiseleur
Thibaut Loiseleur

Reputation: 824

My mistake, everything worked fine. I was misled because I estimated at each step of the algorithm my performance on different batches and also because my model was too complicated for a dummy one (I should had something like y=W*x or y=x+b). Then, when I tried to print in the console, I exucuted several times sess.run on different variables and got obviously non-consistent results.

Upvotes: 1

Related Questions