siby
siby

Reputation: 777

Dataset API 'flat_map' method producing error for same code which works with 'map' method

I am trying to create a create a pipeline to read multiple CSV files using TensorFlow Dataset API and Pandas. However, using the flat_map method is producing errors. However, if I am using map method I am able to build the code and run it in session. This is the code I am using. I already opened #17415 issue in TensorFlow Github repository. But apparently, it is not an error and they asked me to post here.

folder_name = './data/power_data/'
file_names = os.listdir(folder_name)
def _get_data_for_dataset(file_name,rows=100):#
    print(file_name.decode())

    df_input=pd.read_csv(os.path.join(folder_name, file_name.decode()),
                         usecols =['Wind_MWh','Actual_Load_MWh'],nrows = rows)
    X_data = df_input.as_matrix()
    X_data.astype('float32', copy=False)

    return X_data
dataset = tf.data.Dataset.from_tensor_slices(file_names)
dataset = dataset.flat_map(lambda file_name: tf.py_func(_get_data_for_dataset, 
[file_name], tf.float64))
dataset= dataset.batch(2)
fiter = dataset.make_one_shot_iterator()
get_batch = iter.get_next()

I get the following error: map_func must return a Dataset object. The pipeline works without error when I use map but it doesn't give the output I want. For example, if Pandas is reading N rows from each of my CSV files I want the pipeline to concatenate data from B files and give me an array with shape (N*B, 2). Instead, it is giving me (B, N,2) where B is the Batch size. map is adding another axis instead of concatenating on the existing axis. From what I understood in the documentation flat_map is supposed to give a flatted output. In the documentation, both map and flat_map returns type Dataset. So how is my code working with map and not with flat_map?

It would also great if you could point me towards code where Dataset API has been used with Pandas module.

Upvotes: 2

Views: 1867

Answers (1)

mrry
mrry

Reputation: 126154

As mikkola points out in the comments, the Dataset.map() and Dataset.flat_map() expect functions with different signatures: Dataset.map() takes a function that maps a single element of the input dataset to a single new element, whereas Dataset.flat_map() takes a function that maps a single element of the input dataset to a Dataset of elements.

If you want each row of the array returned by _get_data_for_dataset() to become a separate element, you should use Dataset.flat_map() and convert the output of tf.py_func() to a Dataset, using Dataset.from_tensor_slices():

folder_name = './data/power_data/'
file_names = os.listdir(folder_name)

def _get_data_for_dataset(file_name, rows=100):
    df_input=pd.read_csv(os.path.join(folder_name, file_name.decode()),
                         usecols=['Wind_MWh', 'Actual_Load_MWh'], nrows=rows)
    X_data = df_input.as_matrix()
    return X_data.astype('float32', copy=False)

dataset = tf.data.Dataset.from_tensor_slices(file_names)

# Use `Dataset.from_tensor_slices()` to make a `Dataset` from the output of 
# the `tf.py_func()` op.
dataset = dataset.flat_map(lambda file_name: tf.data.Dataset.from_tensor_slices(
    tf.py_func(_get_data_for_dataset, [file_name], tf.float32)))

dataset = dataset.batch(2)

iter = dataset.make_one_shot_iterator()
get_batch = iter.get_next()

Upvotes: 9

Related Questions