paweller
paweller

Reputation: 228

TensorFlow-Keras generator: Turn off auto-sharding or switch auto_shard_policiy to DATA

While training my model I ran into the issue described in the post Tensorflow - Keras: Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. My question now is: Does the solution mentioned by @Graham501617 work with generators as well? Here is some dummy code for what I use so far:

class BatchGenerator(Sequence):

    def __init__(self, some_args):
        ...

    def __len__(self):
        num_batches_in_sequence = ...

    def __getitem__(self, _):
        data, labels = get_one_batch(self.some_args)
        return data, labels

In the main script I do something like:

train_generator = BatchGenerator(some_args)
valid_generator = BatchGenerator(some_args)

cross_device_ops = tf.distribute.HierarchicalCopyAllReduce(num_packs=2)
strategy = tf.distribute.MirroredStrategy(cross_device_ops=cross_device_ops)
with strategy.scope():
    model = some_model

model.compile(some_args)

history = model.fit(
    x=train_generator,
    validation_data=valid_generator,
    ...
)

I would probably have to modify the __getitem__ function somehow, do I?

I appreciate your support!

Upvotes: 1

Views: 2342

Answers (1)

omrii
omrii

Reputation: 413

You'd have to wrap your generator into a single function...

Example below assumes your data is stored as numpy array (.npy), each file already has the correct amount of mini-batch size, is labeled 0_x.npy, 1_x.npy, 2_x.npy, etc.. and both data and label arrays are float64.


from pathlib import Path
import tensorflow as tf
import numpy as np

# Your new generator as a function rather than an object you need to instantiate
def getNextBatch(stop, data_dir):
    i = 0
    data_dir = data_dir.decode('ascii')
    while True:
        while i < stop:
            x = np.load(str(Path(data_dir + "/" + str(i) + "_x.npy")))
            y = np.load(str(Path(data_dir + "/" + str(i) + "_y.npy")))
            yield x, y
            i += 1
        i = 0

# Make a dataset given the directory and strategy
def makeDataset(generator_func, dir, strategy=None):

     # Get amount of files
     data_size = int(len([name for name in os.listdir(dir) if os.path.isfile(os.path.join(dir, name))])/2)
    
     ds = tf.data.Dataset.from_generator(generator_func, args=[data_size, dir], output_types=(tf.float64, tf.float64)) # Make a dataset from the generator. MAKE SURE TO SPECIFY THE DATA TYPE!!!
    
     options = tf.data.Options()
     options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
     ds = ds.with_options(options)
    
     # Optional: Make it a distributed dataset if you're using a strategy
     if strategy is not None:
          ds = strategy.experimental_distribute_dataset(ds)

     return ds



training_ds = makeDataset(getNextBatch, str(Path(data_dir + "/training")), None)
validation_ds = makeDataset(getNextBatch, str(Path(data_dir + "/validation")), None)

model.fit(training_ds,
          epochs=epochs,
          callbacks=callbacks,
          validation_data=validation_ds)

You might need to pass the amount of steps per epoch in your fit() call, in which case you can use the generator you've already made.

Upvotes: 1

Related Questions