Reputation: 11377
I'd like to serialize large numpy ndarray
to TFRecord
. Trouble is, the process if painfully slow. For an array size of (1000000, 65) it takes almost a minute. Serializing the same to other binary format (HDF5, npy, parquet...) takes less than a second. I am pretty sure there's a much faster way to serialize it, but I just can't figure it out.
import numpy as np
import tensorflow as tf
X = np.random.randn(1000000, 65)
def write_tf_dataset(data: np.ndarray, path: str):
with tf.io.TFRecordWriter(path=path) as writer:
for record in data:
feature = {'X': tf.train.Feature(float_list=tf.train.FloatList(value=record[:42])),
'Y': tf.train.Feature(float_list=tf.train.FloatList(value=record[42:64])),
'Z': tf.train.Feature(float_list=tf.train.FloatList(value=[record[64]]))}
example = tf.train.Example(features=tf.train.Features(feature=feature))
serialized = example.SerializeToString()
writer.write(serialized)
write_tf_dataset(X, 'X.tfrecord')
How to increase performance of write_tf_dataset
? Size of my X
is 200x larger than in the snippet.
I am not the first one to complain about the slow performance of TFRecord
. Based on this Tensorflow Github issue I made a second version of the function:
import pickle
def write_tf_dataset(data: np.ndarray, path: str):
with tf.io.TFRecordWriter(path=path) as writer:
for record in data:
feature = {
'X': tf.io.serialize_tensor(record[:42]).numpy(),
'Y': tf.io.serialize_tensor(record[42:64]).numpy(),
'Z': tf.io.serialize_tensor(record[64]).numpy(),
}
serialized = pickle.dumps(feature)
writer.write(serialized)
... but if performed even worse. Ideas?
Upvotes: 3
Views: 808
Reputation: 96
A workaround is using the multiprocessing
package. You can write from many processes to the same TFRecord file, or make each process write to a different file (I think having multiple (small) TFRecords is the recommended approach, instead of a single (big) file, since it is faster to read from multiple sources):
import multiprocessing
import os
import numpy as np
import tensorflow as tf
def serialize_example(record):
feature = {
"X": tf.train.Feature(float_list=tf.train.FloatList(value=record[:42])),
"Y": tf.train.Feature(float_list=tf.train.FloatList(value=record[42:64])),
"Z": tf.train.Feature(float_list=tf.train.FloatList(value=[record[64]])),
}
example = tf.train.Example(features=tf.train.Features(feature=feature))
return example.SerializeToString()
def write_tfrecord(tfrecord_path, records):
with tf.io.TFRecordWriter(tfrecord_path) as writer:
for item in records:
serialized = serialize_example(item)
writer.write(serialized)
if __name__ == "__main__":
np.random.seed(1234)
data = np.random.randn(1000000, 65)
# Option 1: write to a single file
tfrecord_path = "/home/appuser/data/data.tfrecord"
p = multiprocessing.Pool(4)
with tf.io.TFRecordWriter(tfrecord_path) as writer:
for example in p.map(serialize_example, data):
writer.write(example)
# Option 2: write to multiple files
procs = []
n_shard = 4
num_per_shard = int(np.ceil(len(data) / n_shard))
for shard_id in range(n_shard):
filename = f"data_{shard_id + 1:04d}_of_{n_shard:04d}.tfrecord"
tfrecord_path = os.path.join("/home/appuser/data", filename)
start_index = shard_id * num_per_shard
end_index = min((shard_id + 1) * num_per_shard, len(data))
args = (tfrecord_path, data[start_index:end_index])
p = multiprocessing.Process(target=write_tfrecord, args=args)
p.start()
procs.append(p)
for proc in procs:
proc.join()
Upvotes: 1