Lukasz Tracewski
Lukasz Tracewski

Reputation: 11377

Slow serialization of ndarray to TFRecord

I'd like to serialize large numpy ndarray to TFRecord. Trouble is, the process if painfully slow. For an array size of (1000000, 65) it takes almost a minute. Serializing the same to other binary format (HDF5, npy, parquet...) takes less than a second. I am pretty sure there's a much faster way to serialize it, but I just can't figure it out.

import numpy as np
import tensorflow as tf

X = np.random.randn(1000000, 65)

def write_tf_dataset(data: np.ndarray, path: str):
    with tf.io.TFRecordWriter(path=path) as writer:
        for record in data:
            feature = {'X': tf.train.Feature(float_list=tf.train.FloatList(value=record[:42])),
                       'Y': tf.train.Feature(float_list=tf.train.FloatList(value=record[42:64])),
                       'Z': tf.train.Feature(float_list=tf.train.FloatList(value=[record[64]]))}
            example = tf.train.Example(features=tf.train.Features(feature=feature))
            serialized = example.SerializeToString()
            writer.write(serialized)

write_tf_dataset(X, 'X.tfrecord')

How to increase performance of write_tf_dataset? Size of my X is 200x larger than in the snippet.

I am not the first one to complain about the slow performance of TFRecord. Based on this Tensorflow Github issue I made a second version of the function:

import pickle

def write_tf_dataset(data: np.ndarray, path: str):
    with tf.io.TFRecordWriter(path=path) as writer:
        for record in data:
            feature = {
                'X': tf.io.serialize_tensor(record[:42]).numpy(),
                'Y': tf.io.serialize_tensor(record[42:64]).numpy(),
                'Z': tf.io.serialize_tensor(record[64]).numpy(),
            }
            serialized = pickle.dumps(feature)
            writer.write(serialized)

... but if performed even worse. Ideas?

Upvotes: 3

Views: 808

Answers (1)

myagues
myagues

Reputation: 96

A workaround is using the multiprocessing package. You can write from many processes to the same TFRecord file, or make each process write to a different file (I think having multiple (small) TFRecords is the recommended approach, instead of a single (big) file, since it is faster to read from multiple sources):

import multiprocessing
import os

import numpy as np
import tensorflow as tf


def serialize_example(record):
    feature = {
        "X": tf.train.Feature(float_list=tf.train.FloatList(value=record[:42])),
        "Y": tf.train.Feature(float_list=tf.train.FloatList(value=record[42:64])),
        "Z": tf.train.Feature(float_list=tf.train.FloatList(value=[record[64]])),
    }
    example = tf.train.Example(features=tf.train.Features(feature=feature))
    return example.SerializeToString()


def write_tfrecord(tfrecord_path, records):
    with tf.io.TFRecordWriter(tfrecord_path) as writer:
        for item in records:
            serialized = serialize_example(item)
            writer.write(serialized)


if __name__ == "__main__":
    np.random.seed(1234)
    data = np.random.randn(1000000, 65)

    # Option 1: write to a single file
    tfrecord_path = "/home/appuser/data/data.tfrecord"
    p = multiprocessing.Pool(4)
    with tf.io.TFRecordWriter(tfrecord_path) as writer:
        for example in p.map(serialize_example, data):
            writer.write(example)

    # Option 2: write to multiple files
    procs = []
    n_shard = 4
    num_per_shard = int(np.ceil(len(data) / n_shard))
    for shard_id in range(n_shard):
        filename = f"data_{shard_id + 1:04d}_of_{n_shard:04d}.tfrecord"
        tfrecord_path = os.path.join("/home/appuser/data", filename)

        start_index = shard_id * num_per_shard
        end_index = min((shard_id + 1) * num_per_shard, len(data))

        args = (tfrecord_path, data[start_index:end_index])
        p = multiprocessing.Process(target=write_tfrecord, args=args)
        p.start()
        procs.append(p)

    for proc in procs:
        proc.join()

Upvotes: 1

Related Questions