Reputation: 3109
I try to optimize my data input pipeline. The dataset is a set of 450 TFRecord files of size ~70MB each, hosted on GCS. The job is executed with GCP ML Engine. There is no GPU.
Here is the pipeline:
def build_dataset(file_pattern):
return tf.data.Dataset.list_files(
file_pattern
).interleave(
tf.data.TFRecordDataset,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
buffer_size=2048
).batch(
batch_size=2048,
drop_remainder=True,
).cache(
).repeat(
).map(
map_func=_parse_example_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).prefetch(
buffer_size=1
)
With the mapped function:
def _bit_to_float(string_batch: tf.Tensor):
return tf.reshape(tf.math.floormod(tf.dtypes.cast(tf.bitwise.right_shift(
tf.expand_dims(tf.io.decode_raw(string_batch, tf.uint8), 2),
tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8))
), tf.float32), 2), (tf.shape(string_batch)[0], -1))
def _parse_example_batch(example_batch):
preprocessed_sample_columns = {
"features": tf.io.VarLenFeature(tf.float32),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(example_batch, preprocessed_sample_columns)
dense_float = tf.sparse.to_dense(samples["features"])
bits_to_float = _bit_to_float(samples["booleanFeatures"])
return (
tf.concat([dense_float, bits_to_float], 1),
tf.reshape(samples["label"], (-1, 1))
)
I tried to follow the best practices of the data pipeline tutorial, and vectorize my mapped function (as advised by mrry).
With this settings, while data are downloaded at high-speed (bandwidth is around 200MB/s) the CPU is under-used (14%) and the training is very slow (more than 1hour for a epoch).
I tried some parameters configuration, changing the interleave()
arguments like num_parallel_calls
or cycle_length
or the TFRecordDataset
arguments like num_parallel_calls
.
The fastest configuration uses this set of parameters:
interleave.num_parallel_calls
: 1interleave.cycle_length
: 8TFRecordDataset.num_parallel_calls
: 8With this one, one epoch only take ~20 minutes to run. However, CPU usage is only at 50% while bandwidth consumption is around 55MB/s
tf.data.experimental.AUTOTUNE
not find best value to speed up the training?Kind, Alexis.
After some more experimentations, I came to the following solution.
interleave
step which is already handled by TFRecordDataset
if num_parallel_calls
is greater than 0.parse_example
and decode_raw
, returning a tuple `((, ), ())cache
after the map
_bit_to_float
function as a component of the modelFinally, here is the data pipeline code:
def build_dataset(file_pattern):
return tf.data.TFRecordDataset(
tf.data.Dataset.list_files(file_pattern),
num_parallel_reads=multiprocessing.cpu_count(),
buffer_size=70*1000*1000
).shuffle(
buffer_size=2048
).map(
map_func=split,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).batch(
batch_size=2048,
drop_remainder=True,
).cache(
).repeat(
).prefetch(
buffer_size=32
)
def split(example):
preprocessed_sample_columns = {
"features": tf.io.VarLenFeature(tf.float32),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_single_example(example, preprocessed_sample_columns)
dense_float = tf.sparse.to_dense(samples["features"])
bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
return (
(dense_float, bits_to_float),
tf.reshape(samples["label"], (1,))
)
def build_model(input_shape):
feature = keras.Input(shape=(N,))
bool_feature = keras.Input(shape=(M,), dtype="uint8")
one_hot = dataset._bit_to_float(bool_feature)
dense_input = tf.reshape(
keras.backend.concatenate([feature, one_hot], 1),
input_shape)
output = actual_model(dense_input)
model = keras.Model([feature, bool_feature], output)
return model
def _bit_to_float(string_batch: tf.Tensor):
return tf.dtypes.cast(tf.reshape(
tf.bitwise.bitwise_and(
tf.bitwise.right_shift(
tf.expand_dims(string_batch, 2),
tf.reshape(
tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8),
(1, 1, 8)
),
),
tf.constant(0x01, dtype=tf.uint8)
),
(tf.shape(string_batch)[0], -1)
), tf.float32)
Thanks to all these optimizations:
So this seems to be a good first setup. But CPU and BW are still not overused, so any advice is still welcomed!
So, after some benchmarking I came accross what I think is our best input pipeline:
def build_dataset(file_pattern):
tf.data.Dataset.list_files(
file_pattern
).interleave(
TFRecordDataset,
cycle_length=tf.data.experimental.AUTOTUNE,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
2048
).batch(
batch_size=64,
drop_remainder=True,
).map(
map_func=parse_examples_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).cache(
).prefetch(
tf.data.experimental.AUTOTUNE
)
def parse_examples_batch(examples):
preprocessed_sample_columns = {
"features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(examples, preprocessed_sample_columns)
bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
return (
(samples['features'], bits_to_float),
tf.expand_dims(samples["label"], 1)
)
So, what's new:
TFRecordDataset
interleaving is a legacy one, so interleave
function is better.batch
before map
is a good habit (vectorizing your function) and reduce the number of times the mapped function is called.repeat
anymore. Since TF2.0, the Keras model API supports the dataset API and can use cache (see the SO post)VarLenFeature
to a FixedLenSequenceFeature
, removing a useless call to tf.sparse.to_dense
.Hope this can help. Advices are still welcomed.
Upvotes: 26
Views: 5785
Reputation: 333
I have a further suggestion to add:
According to the documentation of interleave(), you can as the first parameter use a mapping function.
This means, one can write:
dataset = tf.data.Dataset.list_files(file_pattern)
dataset = dataset.interleave(lambda x:
tf.data.TFRecordDataset(x).map(parse_fn, num_parallel_calls=AUTOTUNE),
cycle_length=tf.data.experimental.AUTOTUNE,
num_parallel_calls=tf.data.experimental.AUTOTUNE
)
As I understand it, this maps a parsing function to each shard, and then interleaves the results. This then eliminates the use of dataset.map(...)
later on.
Upvotes: 0
Reputation:
Mentioning the Solution and the Important observations of @AlexisBRENON in the Answer Section, for the benefit of the Community.
Below mentioned are the Important Observations:
TFRecordDataset
interleaving
is a legacy one, so interleave
function is better.batch
before map
is a good habit (vectorizing your function) and reduce the number of times the mapped function is called.repeat
anymore. Since TF2.0, the Keras model API supports the dataset API and can use cache (see the SO post)VarLenFeature
to a FixedLenSequenceFeature
, removing a useless call to tf.sparse.to_dense
.Code for the Pipeline, with improved performance, in line with above observations is mentioned below:
def build_dataset(file_pattern):
tf.data.Dataset.list_files(
file_pattern
).interleave(
TFRecordDataset,
cycle_length=tf.data.experimental.AUTOTUNE,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
2048
).batch(
batch_size=64,
drop_remainder=True,
).map(
map_func=parse_examples_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).cache(
).prefetch(
tf.data.experimental.AUTOTUNE
)
def parse_examples_batch(examples):
preprocessed_sample_columns = {
"features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(examples, preprocessed_sample_columns)
bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
return (
(samples['features'], bits_to_float),
tf.expand_dims(samples["label"], 1)
)
Upvotes: 15