Ujjwal
Ujjwal

Reputation: 1859

Dequeueing from RandomShuffleQueue does not reduce size

In order to train a model I have encapsulated my model in a class. I use a tf.RandomShuffleQueue to enqueue a list of filenames to. However when I dequeue the elements they get dequeued but the size of the queue does not reduce.

Following are more specific questions followed by the code snippet :

  1. If I have only 5 images for example, but steps range upto 100, would this result in the addfilenames called repeatedly automatically ? It does not give me any error on dequeuing so I am thinking that it is getting called automatically.
  2. Why the size of the tf.RandomShuffleQueue is not changing ? It remains constant.

    import os
    import time
    import functools
    import tensorflow as tf
    from Read_labelclsloc import readlabel
    
    
    def ReadTrain(traindir):
        # Returns a list of training images, their labels and a dictionay.
        # The dictionary maps label names to integer numbers.                                                                    
        return trainimgs, trainlbls, classdict
    
    
    def ReadVal(valdir, classdict):
       # Reads the validation image labels.
       # Returns a dictionary with filenames as keys and 
       # corresponding labels as values.
        return valdict
    
    def lazy_property(function):
      # Just a decorator to make sure that on repeated calls to 
      # member functions, ops don't get created repeatedly.
      # Acknowledgements : https://danijar.com/structuring-your-tensorflow-models/
        attribute= '_cache_' + function.__name__
        @property
        @functools.wraps(function)
        def decorator(self):
            if not hasattr(self, attribute):
                setattr(self, attribute, function(self))
            return getattr(self, attribute)
    
        return decorator    
    
    class ModelInitial:
    
        def __init__(self, traindir, valdir):
            self.graph
            self.traindir = traindir
            self.valdir = valdir
            self.traininginfo()
            self.epoch = 0
    
    
    
        def traininginfo(self):
            self.trainimgs, self.trainlbls, self.classdict = ReadTrain(self.traindir)
            self.valdict = ReadVal(self.valdir, self.classdict)
            with self.graph.as_default():
                self.trainimgs_tensor = tf.constant(self.trainimgs)
                self.trainlbls_tensor = tf.constant(self.trainlbls, dtype=tf.uint16)
                self.trainimgs_dict = {}
                self.trainimgs_dict["ImageFile"] = self.trainimgs_tensor
            return None
    
        @lazy_property
        def graph(self):
            g = tf.Graph()
            with g.as_default():
               # Layer definitions go here 
            return g
    
    
        @lazy_property
        def addfilenames (self):
       # This is the function where filenames are pushed to a RandomShuffleQueue
            filename_queue = tf.RandomShuffleQueue(capacity=len(self.trainimgs), min_after_dequeue=0,\
                                                   dtypes=[tf.string], names=["ImageFile"],\
                                                   seed=0, name="filename_queue")
    
            sz_op = filename_queue.size()
    
            dq_op = filename_queue.dequeue()
    
            enq_op = filename_queue.enqueue_many(self.trainimgs_dict)
            return filename_queue, enq_op, sz_op, dq_op
    
        def Train(self):
        # The function for training.
        # I have not written the training part yet.
        # Still struggling with preprocessing 
            with self.graph.as_default():
                filename_q, filename_enqueue_op, sz_op, dq_op= self.addfilenames
    
                qr = tf.train.QueueRunner(filename_q, [filename_enqueue_op])
                filename_dequeue_op = filename_q.dequeue()
                init_op = tf.global_variables_initializer()
    
            sess = tf.Session(graph=self.graph)
            sess.run(init_op)
            coord = tf.train.Coordinator()
            enq_threads = qr.create_threads(sess, coord=coord, start=True)
            counter = 0
            for step in range(100):
                print(sess.run(dq_op["ImageFile"]))
                print("Epoch = %d "%(self.epoch))
                print("size = %d"%(sess.run(sz_op)))
                counter+=1
    
            names = [n.name for n in self.graph.as_graph_def().node]
            coord.request_stop()
            coord.join(enq_threads)
            print("Counter = %d"%(counter))
            return None
    
    
    
    
    
    if __name__ == "__main__":
        modeltrain = ModelInitial(<Path to training images>,\
                                        <Path to validation images>)
        a = modeltrain.graph
        print(a)
        modeltrain.Train()
        print("Success")
    

Upvotes: 0

Views: 114

Answers (1)

mrry
mrry

Reputation: 126184

The mystery is caused by the tf.train.QueueRunner that you created for the queue, which causes it to be filled in the background.

  1. The following lines cause a background "queue runner" thread to be created:

    qr = tf.train.QueueRunner(filename_q, [filename_enqueue_op])
    # ... 
    enq_threads = qr.create_threads(sess, coord=coord, start=True)
    

    This thread calls filename_enqueue_op in a loop, which causes the queue to be filled up as you remove elements from it.

  2. The background thread from step 1 will almost always have a pending enqueue operation (filename_enqueue_op) on the queue. This means that after you dequeue a filename, the pending enqueue will run add fill the queue back up to capacity. (Technically there is a race condition here and you could see a size of capacity - 1, but this is quite unlikely).

Upvotes: 1

Related Questions