soloice
soloice

Reputation: 1040

Using multiple input pipelines in TensorFlow

I know how to use an input pipeline to read data from files:

input = ... # Read from file
loss = network(input) # build a network
train_op = ... # Using SGD or other algorithms to train the network.

But how can I switch between multiple input pipelines? Say, if I want to train a network for 1000 batches on the training set from the training pipeline, then validate it on a validation set from another pipeline, then keep training, then validate, then train, ..., and so forth.

It's easy to implement this with feed_dict. I also know how to use checkpoints to achieve this, just like in the cifar-10 example. But it's kind of cumbersome: I need to dump the model to disk then read it from disk again.

Can I just switch between two input pipelines (one for training data, one for validation data) to achieve this? Reading 1000 batches from the training data queue, then a few batched from the validation data queue, and so forth. If it is possible, how to do it?

Upvotes: 2

Views: 1826

Answers (1)

VS_FF
VS_FF

Reputation: 2363

Not sure if this is exactly what you are looking for, but I am doing training and validation in the same code in two separate loops. My code reads numeric and string data from .CSV files, not images. I am reading from two separate CSV files, one for training and one for validation. I'm sure you can generalize it to read from two 'sets' of files, rather than just single files, as the code is there.

Here are the code snippets in case it helps. Note that this code first reads everything as string and then converts the necessary cells into floats, just given my own requirements. If your data is purely numeric, you should just set the defaults to floats and all should be easier. Also, there are a couple of lines in there that drop Weights and Biases into a CSV file AND serialize them into the TF checkpoint file, depending on which way you'd prefer.

        #first define the defaults:
        rDefaults = [['a'] for row in range((TD+TS+TL))]

    # this function reads line-by-line from CSV and separates cells into chunks:   
        def read_from_csv(filename_queue):
            reader = tf.TextLineReader(skip_header_lines=False)
            _, csv_row = reader.read(filename_queue)
            data = tf.decode_csv(csv_row, record_defaults=rDefaults)
            dateLbl = tf.slice(data, [0], [TD])
            features = tf.string_to_number(tf.slice(data, [TD], [TS]), tf.float32)
            label = tf.string_to_number(tf.slice(data, [TD+TS], [TL]), tf.float32)
            return dateLbl, features, label

    #this function loads the above lines and spits them out as batches of N:
        def input_pipeline(fName, batch_size, num_epochs=None):
            filename_queue = tf.train.string_input_producer(
                [fName],
                num_epochs=num_epochs,
                shuffle=True)  
            dateLbl, features, label = read_from_csv(filename_queue)
            min_after_dequeue = 10000 
            capacity = min_after_dequeue + 3 * batch_size # max of how much to load into memory
            dateLbl_batch, feature_batch, label_batch = tf.train.shuffle_batch(
                [dateLbl, features, label], 
                batch_size=batch_size,
                capacity=capacity,
                min_after_dequeue=min_after_dequeue)
            return dateLbl_batch, feature_batch, label_batch

    # These are the TRAINING features, labels, and meta-data to be loaded from the train file:    
        dateLbl, features, labels = input_pipeline(fileNameTrain, batch_size, try_epochs)
    # These are the TESTING features, labels, and meta-data to be loaded from the test file:
        dateLblTest, featuresTest, labelsTest = input_pipeline(fileNameTest, batch_size, 1) # 1 epoch here regardless of training

    # then you define the model, start the session, blah blah    

    # fire up the queue:        
            coord = tf.train.Coordinator()
            threads = tf.train.start_queue_runners(coord=coord)

    #This is the TRAINING loop:
try:            
                while not coord.should_stop():

                    dateLbl_batch, feature_batch, label_batch = sess.run([dateLbl, features, labels])      

                   _, acc, summary = sess.run([train_step, accuracyTrain, merged_summary_op], feed_dict={x: feature_batch, y_: label_batch, 
                keep_prob: dropout, 
                learning_rate: lRate})

            except tf.errors.OutOfRangeError: # (so done reading the file(s))

    # by the way, this dumps weights and biases into a CSV file, since you asked for that
                np.savetxt(fPath + fIndex + '_weights.csv', sess.run(W), 
    # and this serializes weight and biases into the TF-formatted protobuf:
        #        tf.train.Saver({'varW': W, 'varB': b}).save(sess, fileNameCheck)

            finally:
                coord.request_stop()

    # now re-start the runners for the testing file:   
            coord = tf.train.Coordinator()
            threads = tf.train.start_queue_runners(coord=coord)

            try:

                while not coord.should_stop():
    # so now this line reads features, labels, and meta-data, but this time from the training file:                
                    dateLbl_batch, feature_batch, label_batch = sess.run([dateLblTest, featuresTest, labelsTest])

                    guessY = tf.argmax(y, 1).eval({x: feature_batch, keep_prob: 1})
                    trueY = tf.argmax(label_batch, 1).eval()

                    accuracy = round(tf.reduce_mean(tf.cast(tf.equal(guessY, trueY), tf.float32)).eval(), 2)

            except tf.errors.OutOfRangeError:
                acCumTest /= i
            finally:
                coord.request_stop()


            coord.join(threads)

This may differ from what you are trying to do in the sense that it first completes the Training loop and THEN restarts the queues for the Testing loop. Not sure how you'd do this if you want to go back and fourth, but you can try to experiment with the two functions defined above by passing them the relevant file names (or lists) interchangeably.

Also I'm not sure if re-starting the queues after training is the best way to go, but it works for me. Would love to see a better example out there, as most TF examples use some built-in wrappers around the MNIST dataset to do the training in one go...

Upvotes: 2

Related Questions