DocDriven
DocDriven

Reputation: 3974

How to create a multivariate timeseries dataset with tf.data?

I am trying to create an input pipeline for my LSTM model. I am using the tf.data.Dataset.from_generator API to do that. Following the guide, my current minimal example looks like this:

class generator:
    def __init__(self, n=5):
        self.n = n

    def __call__(self):
        for i in range(self.n):
            yield (i, 10*i)

dataset = tf.data.Dataset.from_generator(generator(), 
    output_signature=(tf.TensorSpec(shape=(), dtype=tf.uint16), tf.TensorSpec(shape=(), dtype=tf.int32)))

window_size = 3
windows = dataset.window(window_size, shift=1)

def sub_to_batch(sub):
    return sub.batch(window_size, drop_remainder=True)

final_dset = windows.flat_map(sub_to_batch)

print(list(final_dset.as_numpy_iterator()))

Error message

TypeError: tf__sub_to_batch() takes 1 positional argument but 2 were given

This problem only occurs when using more than one feature in the generator (e.g. change the following lines).

yield (i)

dataset = tf.data.Dataset.from_generator(generator(), 
    output_signature=(tf.TensorSpec(shape=(), dtype=tf.uint16)))

In the version with only 1 feature, the output looks like shape=(3, 3, 1)

[ [ [0], [1], [2] ],
  [ [1], [2], [3] ],
  [ [2], [3], [4] ]  ]

I basically try to achieve a zipping of the individual features so that I get shape=(3, 3, 2):

[ [ [0,  0], [1, 10], [2, 20] ],
  [ [1, 10], [2, 20], [3, 30] ],
  [ [2, 20], [3, 30], [4, 40] ]  ]

How can this be done?

Upvotes: 2

Views: 494

Answers (1)

AloneTogether
AloneTogether

Reputation: 26708

You could try something like the following; however, I am not sure how efficient it is:

import tensorflow as tf

class generator:
    def __init__(self, n=7):
        self.n = n

    def __call__(self):
        for i in range(self.n):
            yield (i, 10*i)

dataset = tf.data.Dataset.from_generator(generator(), 
    output_signature=(tf.TensorSpec(shape=(), dtype=tf.int32), tf.TensorSpec(shape=(), dtype=tf.int32)))

window_size = 5
windows = dataset.window(window_size, shift=1)

def stack(x, y):
  x = tf.expand_dims(x, axis=1)
  y = tf.expand_dims(y, axis=1)
  result = tf.concat((x, y), axis=1)
  ta = tf.TensorArray(tf.int32, size=0, dynamic_size=True)
  for w in tf.range(3):
    ta = ta.write(w, result[w: w + 3])
  return ta.stack()

def sub_to_batch(sub1, sub2):
    sub1 = sub1.batch(5, drop_remainder=True)
    sub2 = sub2.batch(5, drop_remainder=True)

    return tf.data.Dataset.zip((sub1, sub2)).map(stack)

final_dset = windows.flat_map(sub_to_batch)
for s in final_dset.take(1):
  print(s)
tf.Tensor(
[[[ 0  0]
  [ 1 10]
  [ 2 20]]

 [[ 1 10]
  [ 2 20]
  [ 3 30]]

 [[ 2 20]
  [ 3 30]
  [ 4 40]]], shape=(3, 3, 2), dtype=int32)

You could also hard-code the indices if you want to and the results will be the same:

def stack(x, y):
  x = tf.expand_dims(x, axis=1)
  y = tf.expand_dims(y, axis=1)
  result = tf.concat((x, y), axis=1)
  return tf.stack([result[0: 3], result[1: 4], result[2: 5]])

Upvotes: 3

Related Questions