Reputation: 1808
I have millions of entities of a particular type that i would like to export to a csv file. The following code writes entities in batches of 1000 to a blob while keeping the blob open and deferring the next batch to the task queue. When there are no more entities to be fetched the blob is finalized. This seems to work for most of my local testing but I wanted to know:
If i am missing out on any gotchas or corner cases before running it on my production data and incurring $s for datastore reads.
If the deadline is exceeded or the memory runs out while the batch is being written to the blob, this code is defaulting to the start of the current batch for running the task again which may cause a lot of duplication. Any suggestions to fix that?
def entities_to_csv(entity_type,blob_file_name='',cursor='',batch_size=1000):
more = True
next_curs = None
q = entity_type.query()
results,next_curs,more = q.fetch_page(batch_size,start_cursor=Cursor.from_websafe_string(cursor))
if results:
try:
if not blob_file_name:
blob_file_name = files.blobstore.create(mime_type='text/csv',_blob_uploaded_filename='%s.csv' % entity_type.__name__)
rows = [e.to_dict() for e in results]
with files.open(blob_file_name, 'a') as f:
writer = csv.DictWriter(f,restval='',extrasaction='ignore',fieldnames=results[0].keys())
writer.writerows(rows)
if more:
deferred.defer(entity_type,blob_file_name,next_curs.to_websafe_string())
else:
files.finalize(blob_file_name)
except DeadlineExceededError:
deferred.defer(entity_type,blob_file_name,cursor)
Later in the code, something like:
deferred.defer(entities_to_csv,Song)
Upvotes: 1
Views: 121
Reputation: 15143
Check out this Google I/O video, pretty much describes what you want to do using MapReduce, starting at around the 23:15 mark in the video. The code you want is at 27:19
https://developers.google.com/events/io/sessions/gooio2012/307/
Upvotes: 0
Reputation: 31928
The problem with your current solution is that your memory will increase with every write to preform to the blobstore. the blobstore is immutable and write all the data at once from the memory.
You need to run the job on a backend that can hold all the records in memory, you need to define a backend in your application and call defer with _target='<backend name>'
.
Upvotes: 1