Tara Eicher
Tara Eicher

Reputation: 168

Troubleshooting TensorFlow input pipeline for CSV file queue

I'm new to TensorFlow and am training a SOM model based on Sachin Joglekar's blog (https://codesachin.wordpress.com/2015/11/28/self-organizing-maps-with-googles-tensorflow/) using multiple CSV files as input. I followed the tutorial at https://www.tensorflow.org/programmers_guide/reading_data for reading from a CSV file queue in minibatches. My code is running, but I want to print out the decoded CSV input from the reader to verify that the input pipeline is working correctly. Since the CSV file input is not part of the graph, I am unable to print it using Tensor.eval(self.sess). When I try to print out the decoded record labels using self.label.eval(session = tf.Session(graph=self.label.graph)), my script hangs and does not give any output. Is there a way for me to verify that my input pipeline is working? Here are the relevant snippets of my code:

Main function

def main(argv):
    som = SOM(somDim1, somDim2, windowSizes[win], iterations, learningRate, 
    neighborhood, fileNameList, batchSize)
    som.train(batchSize, fileNameList, windowSizes[win])

Graph

def __init__(self, m, n, dim, iterations, alpha, sigma, fileNameList, batchSize):

    ##INITIALIZE GRAPH
    self.graph = tf.Graph()

    ##POPULATE GRAPH WITH NECESSARY COMPONENTS
    with self.graph.as_default():

        ##PLACEHOLDERS FOR TRAINING INPUTS
        #These should be placeholders according to the TensorFlow framework,
        #but we are declaring them as variables so that we can assign them directly
        #to values read in from the CSV files.
        batchInputLg = np.zeros((dim, batchSize))
        labelFloat = np.zeros((3, batchSize))
        self.label = tf.cast(labelFloat, "string")
        self.batchInput = tf.cast(batchInputLg, "float32")

        """
        ...the rest of the graph...
        """

        self.trainingOp = tf.assign(self.weightageVects, newWeightagesOp)

        ##INITIALIZE SESSION
        self.sess = tf.Session()

        ##INITIALIZE VARIABLES
        initOp = tf.global_variables_initializer()
        self.sess.run(initOp)

Input pipeline functions

"""
Read in the features and metadata from the CSV files for each chromosome.
"""
def readFromCsv(self, fileNameQ, dim):
    reader = tf.TextLineReader()
    _, csvLine = reader.read(fileNameQ)
    recordDefaults = [["\0"] for cl in range(dim - 1)]
    recordStr = tf.decode_csv(csvLine, record_defaults=recordDefaults)
    self.label = tf.stack(recordStr[0:2])
    #self.label.eval(session = tf.Session(graph=self.label.graph))
    self.features = tf.to_float(tf.stack(recordStr[3:dim - 1]))
    return (self.features, self.label)

"""
Read in the features and metadata from the CSV files for each chromosome.
"""
def inputPipeline(self, batchSize, fileNameList, dim, num_epochs=None):
    fileNameQ = tf.train.string_input_producer(fileNameList, shuffle = True)  
    minAfterDequeue = 10000
    capacity = minAfterDequeue + 3 * batchSize
    example, label = self.readFromCsv(fileNameQ, dim)
    exampleBatchStr, labelBatch = tf.train.shuffle_batch([example, label], batch_size=batchSize, capacity=capacity, min_after_dequeue=minAfterDequeue)
    exampleBatch = tf.cast(exampleBatchStr, "float")
    return (exampleBatch, labelBatch)

Training function

def train(self, batchSize, fileNameList, dim):
    #Start the queue runners.
    # Start input enqueue threads.
    coordFile = tf.train.Coordinator()
    self.coord = tf.train.Coordinator()
    threadsFile = tf.train.start_queue_runners(sess=self.sess, coord=coordFile)
    self.threads = tf.train.start_queue_runners(sess=self.sess, coord=self.coord)

    #Training iterations
    self.iterationInput = 0
    try:
        for iter in range(self.iterations):
            #Train with each vector one by one
            self.iterationInput += 1
            while not self.coord.should_stop():
                #Fill in input data.
                [self.batchInput, self.label] = self.inputPipeline(batchSize, fileNameList, dim)
                self.sess.run(self.trainingOp)

    except tf.errors.OutOfRangeError:
        print('Done training -- epoch limit reached')

    # When done, ask the threads to stop.
    self.coord.request_stop()   

Upvotes: 0

Views: 257

Answers (1)

Tara Eicher
Tara Eicher

Reputation: 168

I figured out the solution. Instead of initializing the label and batch input tensors in the graph and assigning them within the train() function, I should have placed the assignment statement inside the graph, like so:

##TRAINING INPUTS
self.batchInput, self.label = self.inputPipeline(batchSize, fileNameList, dim)

Then, the train function becomes:

def train(self, batchSize, fileNameList, dim):
    with self.sess:
        # Start populating the filename queue.
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(coord=coord)

        #Training iterations
        self.iterationInput = 0
        try:
            for iter in range(self.iterations):
                #Train with each vector one by one
                self.iterationInput += 1
                while not coord.should_stop():
                    #Fill in input data.
                    self.sess.run([self.batchInput, self.label])
                    self.sess.run(self.trainingOp)
                    print self.label.eval(session = self.sess)
        except tf.errors.OutOfRangeError:
            print('Done training -- epoch limit reached')

        # When done, ask the threads to stop.
        coord.request_stop()
        coord.join(threads)

Upvotes: 1

Related Questions