Reputation: 4691
I need to break down my data-set file of size 4GB into chunks small chunks. As part of optimizing the time consumption, I would like to maximize the parallel processing. Currently, I can observe that cores of CPU and GPU are under utilized. See the attached output in the image here.
My Code snippet looks like below
def _bytes_feature(value):
"""Returns a bytes_list from a string / byte."""
if isinstance(value, type(tf.constant(0))):
value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _float_feature(value):
"""Returns a float_list from a float / double."""
return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))
def _int64_feature(value):
"""Returns an int64_list from a bool / enum / int / uint."""
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def serialize_row(text, rating):
# Create a dictionary mapping the feature name to the tf.Example-compatible data type.
feature = {
'text': _bytes_feature(text),
'rating': _float_feature(rating),
}
# Create a Features message using tf.train.Example.
example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
return example_proto.SerializeToString()
def transform(example):
str_example = example.decode("utf-8")
json_example = json.loads(str_example)
overall = json_example.get('overall', -99)
text = json_example.get('reviewText', '')
if type(text) is str:
text = bytes(text, 'utf-8')
tf_serialized_string = serialize_row(text, overall)
return tf_serialized_string
line_dataset = tf.data.TextLineDataset(filenames=[file_path])
line_dataset = line_dataset.map(lambda row: tf.numpy_function(transform, [row], tf.string))
line_dataset = line_dataset.shuffle(2)
line_dataset = line_dataset.batch(NUM_OF_RECORDS_PER_BATCH_FILE)
'''
Perform batchwise transformation of the population.
'''
start = time.time()
for idx, line in line_dataset.enumerate():
FILE_NAMES = 'test{0}.tfrecord'.format(idx)
end = time.time()
time_taken = end - start
tf.print('Processing for file - {0}'.format(FILE_NAMES))
DIRECTORY_URL = '/home/gaurav.gupta/projects/practice/'
filepath = os.path.join(DIRECTORY_URL, 'data-set', 'electronics', FILE_NAMES)
batch_ds = tf.data.Dataset.from_tensor_slices(line)
writer = tf.data.experimental.TFRecordWriter(filepath)
writer.write(batch_ds)
tf.print('Processing for file - {0} took {1}'.format(FILE_NAMES, time_taken))
tf.print('Done')
Logs to showcase execution flow
Processing for file - test0.tfrecord took 14.350863218307495
Processing for file - test1.tfrecord took 12.695453882217407
Processing for file - test2.tfrecord took 12.904462575912476
Processing for file - test3.tfrecord took 12.344425439834595
Processing for file - test4.tfrecord took 11.188365697860718
Processing for file - test5.tfrecord took 11.319620609283447
Processing for file - test6.tfrecord took 11.285977840423584
Processing for file - test7.tfrecord took 11.169529438018799
Processing for file - test8.tfrecord took 11.289997816085815
Processing for file - test9.tfrecord took 11.431073188781738
Processing for file - test10.tfrecord took 11.428141593933105
Processing for file - test11.tfrecord took 3.223125457763672
Done
I have tried num_parallel_reads
argument but couldn't see much difference. I believe it can be handy while reading multiple files instead of single big file.
I am seeking for your suggestion to parallelize this task to reduce the time consumption.
Upvotes: 0
Views: 387
Reputation: 399
I would try something like this (I like to use joblib as it is quite simple to put into existing code, you could probably do something similar with many other frameworks, furthermore, joblib does not use GPU nor it does not use any JITting):
from joblib import Parallel, delayed
from tqdm import tqdm
...
def process_file(idx, line):
FILE_NAMES = 'test{0}.tfrecord'.format(idx)
end = time.time()
time_taken = end - start
tf.print('Processing for file - {0}'.format(FILE_NAMES))
DIRECTORY_URL = '/home/gaurav.gupta/projects/practice/'
filepath = os.path.join(DIRECTORY_URL, 'data-set', 'electronics', FILE_NAMES)
batch_ds = tf.data.Dataset.from_tensor_slices(line)
writer = tf.data.experimental.TFRecordWriter(filepath)
writer.write(batch_ds)
#tf.print('Processing for file - {0} took {1}'.format(FILE_NAMES, time_taken))
return FILE_NAMES, time_taken
times = Parallel(n_jobs=12, prefer="processes")(delayed(process_file)(idx, line) for idx, line in tqdm(line_dataset.enumerate(), total=len(line_dataset)))
print('Done.')
This is untested code and I am also unsure how it will work with the tf code, but I would give it a try.
The tqdm
is totally unnecessary, it is just something I prefer to use as it provides a nice progress bar.
Upvotes: 1