Reputation: 35
I've been trying to create a custom Dataloader that can serve batches of data that are all same-sized to feed into a Conv2d layer for classification purposes.
Here's some test data
X
is a NUMBER OF POINTS x CHOICES x NUM_FEATURES
, while y
is the label (that can be any integer CHOICES-1
)
I'm having trouble writing the Sampler and Dataloader.
import random
import torch
from collections import defaultdict
from sklearn.utils import shuffle
from torch.utils.data import Dataset, DataLoader
from typing import Sequence, Iterator
import numpy as np
sample_probs = np.array([2.04302017e-03, 6.84249612e-03, 3.18776004e-02, 6.69332322e-01,
1.79056125, 1.63388916, 1.31819391, 1.43798623,
2.44057406, 5.51664089e-01, 9.66624185e-02, 1.67495225e-02,
3.59960696e-03, 2.43216687e-05])
X = []
y = []
train_datasets = []
i_dict = {0: 19,
1: 63,
2: 30,
3: 6192,
4: 16564,
5: 15115,
6: 12195,
7: 13303,
8: 22578,
9: 5103,
10: 894,
11: 155,
12: 33,
13: 2}
for i in range(2,16):
temp_x = []
temp_y = []
for j in range(i_dict[i-2]):
temp_x.append(torch.rand(i, 4, 1))
temp_y.append(torch.tensor(random.randint(0,i-1)))
X = torch.stack(temp_x)
y = torch.stack(temp_y)
train_datasets.append((X.clone(),y.clone()))
class WeightedBucketSampler(torch.utils.data.Sampler):
def __init__(self, data, weights: Sequence[float], num_samples: int,
replacement: bool = True, generator=None, shuffle=True, drop_last=False):
super().__init__(data)
self.shuffle = shuffle
self.drop_last = drop_last
self.weights = torch.as_tensor(weights, dtype=torch.double)
self.num_samples = num_samples
self.replacement = replacement
self.generator = generator
self.buckets = defaultdict(list)
'''data is a CustomDataset containing a tensor of COUNT x NUM_ROUTES x FEATURES x 1 and a tensor with the corresponding labels'''
counter = 0
for i in range(len(data)):
self.buckets[i+2] += [data[i][0],data[i][1]]
counter += len(data[i][0])
self.length = counter
def __iter__(self) -> Iterator[int]:
# Choose a bucket depending on the weighted sample
rand_bucket = torch.multinomial(self.weights, self.num_samples, self.replacement, generator=self.generator).tolist()[0]
shifter = sum([len(self.buckets[i+2][0]) for i in range(rand_bucket)])
# Generate random indices from the bucket
rand_tensor = torch.randperm(len(self.buckets[rand_bucket+2][0]), generator=self.generator)
yield from torch.add(rand_tensor, shifter).tolist()
def __len__(self):
return self.length
class CustomDataset(Dataset):
def __init__(self, data):
self.routes = dict()
self.choice = dict()
counter = 0
for i in range(len(data)):
for j in range(len(data[i][0])):
self.routes[counter] = data[i][0][j]
self.choice[counter] = data[i][1][j]
counter += 1
def __len__(self):
return len(self.choice)
def __getitem__(self, idx):
choice = self.choice[idx]
routes = self.routes[idx]
return routes, choice
train_datasets_ds = CustomDataset(train_datasets)
bucket_sampler = WeightedBucketSampler(train_datasets, sample_probs,len(sample_probs), shuffle=True, drop_last=False)
loader = DataLoader(train_datasets_ds, sampler=bucket_sampler, batch_size=32, pin_memory=True)
for X,y in loader:
print(X.size(),y.size())
This code is a combination of WeightedRandomSampler and Bucket sampling code
I'm essentially sampling via the sample weights of each classification to choose a bucket, and from that bucket choose randomly to form a batch up to batch_size
.
However, when going through loader
, I get the output:
...
torch.Size([32, 10, 4, 1]) torch.Size([32])
torch.Size([32, 10, 4, 1]) torch.Size([32])
torch.Size([32, 10, 4, 1]) torch.Size([32])
torch.Size([18, 10, 4, 1]) torch.Size([18])
The sum of all these batches add up to the elements in bucket 10. So it's right, but it's not jumping to another bucket. Rerunning the code
for X,y in loader:
print(X.size(),y.size())
will produce another bucket's batches.
I'm still learning PyTorch, so some of the code might be inefficient. Would love some advice as well!
Upvotes: 0
Views: 700
Reputation: 35
Thanks to some help on the unofficial PyTorch Discord channel (sudomaze), I've fixed my problem. There's a need to iterate through all the data in the sampler.
The __len__
function in the sampler also needed fixing.
class WeightedBucketSampler(Sampler[List[int]]):
def __init__(self, data, weights: Sequence[float], num_samples: int,
replacement: bool = True, generator=None, shuffle=True, batch_size=32, drop_last=False):
super().__init__(data)
self.shuffle = shuffle
self.drop_last = drop_last
self.weights = torch.as_tensor(weights, dtype=torch.double)
self.num_samples = num_samples
self.replacement = replacement
self.generator = generator
self.batch_size = batch_size
self.buckets = defaultdict(list)
'''data is a CustomDataset containing a tensor of COUNT x NUM_ROUTES x FEATURES x 1 and a tensor with the corresponding labels'''
counter = 0
for i in range(len(data)):
self.buckets[i+2] += [data[i][0],data[i][1]]
counter += len(data[i][0])
self.length = counter
def __iter__(self) -> Iterator[int]:
# Choose a bucket depending on the weighted sample
rand_bucket = torch.multinomial(self.weights, self.num_samples, self.replacement, generator=self.generator)
batch = [0] * self.batch_size
idx_in_batch = 0
for bucket_idx in rand_bucket.tolist():
bucketsample_count = 0
shifter = sum([len(self.buckets[i+2][0]) for i in range(bucket_idx)])
# Generate random indices from the bucket and shift them
rand_tensor = torch.randperm(len(self.buckets[bucket_idx+2][0]), generator=self.generator)
# print(len(self.buckets[bucket_idx+2][0]), len(rand_tensor.tolist()))
for idx in rand_tensor.tolist():
batch[idx_in_batch] = idx+shifter
idx_in_batch += 1
if idx_in_batch == self.batch_size:
bucketsample_count += self.batch_size
yield batch
idx_in_batch = 0
batch = [0] * self.batch_size
if idx_in_batch > 0:
bucketsample_count += idx_in_batch
yield batch[:idx_in_batch]
# The last remaining tensors are added into one batch. Terminate batch and move to next bucket
idx_in_batch = 0
batch = [0] * self.batch_size
continue
def __len__(self):
return (self.length + (self.batch_size - 1)) // self.batch_size
class CustomDataset(Dataset):
def __init__(self, data):
self.routes = dict()
self.choice = dict()
counter = 0
for i in range(len(data)):
for j in range(len(data[i][0])):
self.routes[counter] = data[i][0][j]
self.choice[counter] = data[i][1][j]
counter += 1
def __len__(self):
return len(self.choice)
def __getitem__(self, idx):
choice = self.choice[idx]
routes = self.routes[idx]
return routes, choice
w = np.array([len(i[0]) for i in train_datasets])
sample_probs = 1/sample_probs*w
train_datasets_ds = CustomDataset(train_datasets)
bucket_sampler = WeightedBucketSampler(train_datasets, sample_probs,len(sample_probs), shuffle=True, batch_size=batch_size, drop_last=False)
train_loader = DataLoader(train_datasets_ds, batch_sampler=bucket_sampler)
Upvotes: 1