Pleastry
Pleastry

Reputation: 434

Pytorch Dataloader for reading a large parquet/csv file

I am trying to get Pytorch to train records of a single parquet file, without having to read the entire file in memory at once since it won't fit in memory. Since the file is stored remotely, I would rather keep it as a single file, as training using IO for many files is extremely expensive. How can I use Pytorch's IterableDataset or Dataset to read smaller chunks of the file during training when I want to specify the number of batches in the DataLoader? I know that the map-style Dataset won't work in this case since I need everything in one file rather than reading the index of each file.

I managed to implement this in Tensorflow using tfio.IODataset and tf.data.Dataset, but I can't find the equivalent way to implement it in Pytorch.

Upvotes: 4

Views: 5858

Answers (1)

Pleastry
Pleastry

Reputation: 434

I found a workaround using torch.utils.data.Dataset, but the data must be manipulated using dask beforehand such that each partition is a user, stored as its own parquet file, but can be read only once later. In the following code, the labels and the data are stored separately for the multivariate timeseries classification problem (but can be easily adapted to other tasks as well):

import dask.dataframe as dd
import pandas as pd
import numpy as np
import torch
from torch.utils.data import TensorDataset, DataLoader, IterableDataset, Dataset

# Breakdown file
raw_ddf = dd.read_parquet("data.parquet") # Read huge file using dask
raw_ddf = raw_ddf.set_index("userid") # set the userid as index
userids = raw_ddf.index.unique().compute().values.tolist() # get a list of indices
new_ddf = raw_ddf.repartition(divisions = userids) # repartition by userids
new_ddf.to_parquet("my_folder") # this will save each user as its own parquet file within "my_folder"

# Dask to read the partitions
train_ddf = dd.read_parquet("my_folder/*.parquet") # read all files

# Read labels file
labels_df = pd.read("label.csv")
y_labels = np.array(labels_df["class"])

# Define the Dataset class
class UsersDataset(Dataset):
    def __init__(self, dask_df, labels):
        self.dask_df = dask_df
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx): 
        X_df = self.dask_df.get_partition(idx).compute()
        X = np.row_stack([X_df])
        X_tensor = torch.tensor(X, dtype=torch.float32)
        y = self.labels[idx]
        y_tensor = torch.tensor(y, dtype=torch.long)
        sample = (X_tensor, y_tensor) 
        return sample

# Create a Dataset object
user_dataset = UsersDataset(dask_df=ddf_train, labels = y_train) 

# Create a DataLoader object
dataloader = DataLoader(user_dataset, batch_size=4, shuffle=True, num_workers=0)

# Print output of the first batch to ensure it works
for i_batch, sample_batched in enumerate(dataloader): 
    print("Batch number ", i_batch)
    print(sample_batched[0]) # print X
    print(sample_batched[1]) # print y

    # stop after first batch.
    if i_batch == 0:
        break

I would like to know how can I adapt my approach when using >= 2 workers to read the data, without duplicate entries. Any insights on this are greatly appreciated.

Upvotes: 4

Related Questions