Reputation: 2607
I am trying to copy a large, compressed file (.gz) to another compressed file (.gz) using python. I will perform intermediate processing on the data that is not present in my code sample. I would like to be able to use multiprocessing with locks to write to the new gzip in parallel from multiple processes, but I get an invalid format error on the output gz file.
I assume that this is because a lock is not enough to support writing to a gzip in parallel. Since compressed data requires "knowledge" of the data that came before it in order to make correct entries into the archive I don't think that python can't handle this by default. I'd guess that each process maintains its own awareness of the gzip output and that this state diverges after the first write.
If I open the target file in the script without using gzip then this all works. I could also write to multiple gzips and merge them, but prefer to avoid that if possible.
Here is my source code:
#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue, Lock
def reader(infile, data_queue, coordinator_queue, chunk_size):
print("Reader Started.")
while True:
data_chunk = list(islice(infile, chunk_size))
data_queue.put(data_chunk)
coordinator_queue.put('CHUNK_READ')
if not data_chunk:
coordinator_queue.put('READ_DONE')
#Process exit
break
def writer(outfile, data_queue, coordinator_queue, write_lock, ID):
print("Writer Started.")
while True:
queue_message = data_queue.get()
if (queue_message == 'DONE'):
outfile.flush()
coordinator_queue.put('WRITE_DONE')
#Process exit
break
else:
print("Writer",ID,"-","Write Lock:",write_lock)
write_lock.acquire()
print("Writer",ID,"-","Write Lock:",write_lock)
for line in queue_message:
print("Line write:",line)
outfile.write(line)
write_lock.release()
print("Writer",ID,"-","Write Lock:",write_lock)
def coordinator(reader_procs, writer_procs, coordinator_queue, data_queue):
print("Coordinator Started.")
active_readers=reader_procs
active_writers=writer_procs
while True:
queue_message = coordinator_queue.get()
if queue_message=='READ_DONE':
active_readers = active_readers-1
if active_readers == 0:
while not data_queue.qsize() == 0:
continue
[data_queue.put('DONE') for x in range(writer_procs)]
if queue_message=='WRITE_DONE':
active_writers = active_writers-1
if active_writers == 0:
break
def main():
reader_procs=1
writer_procs=2
chunk_size=1
queue_size=96
data_queue = Queue(queue_size)
coordinator_queue=Queue()
write_lock=Lock()
infile_path='/directory/input_records.json.gz'
infile = gzip.open(infile_path, 'rt')
outfile_path='/directory/output_records.json.gz'
outfile = gzip.open(outfile_path, 'wt')
#Works when it is uncompressed
#outfile=open(outfile_path, 'w')
readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
writers = [Process(target=writer, args=(outfile, data_queue, coordinator_queue, write_lock, x)) for x in range(writer_procs)]
coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, coordinator_queue, data_queue))
coordinator_p.start()
for process in readers:
process.start()
for process in writers:
process.start()
for process in readers:
process.join()
for process in writers:
process.join()
coordinator_p.join()
outfile.flush()
outfile.close()
main()
Notes about the code:
I would guess that I need a library that can somehow coordinate the compressed writes between the different processes. Obviously this speaks to the use of a single process to perform the write (like the coordinator process) however that would likely introduce a bottleneck.
There are some related posts on stack but none that seem to specifically address what I am trying to do. I also see utilities like "mgzip", "pigz" and "migz" out there that can parallelize compression, but I don't believe they are applicable to this use case. mgzip didn't work in my testing (0 sized file), pigz appears to consume an entire file as input on the command line and migz is a java library, so I'm not sure how I could integrate it into python.
If it can't be done then so be it but any answer would be appreciated!
------- Update and working code:
With help from Mark Adler I was able to create a multiprocessing script that compresses the data in parallel and has a single writer process to add it to the target gz file. With the throughput on a modern NVME drive this reduces the likelihood of becoming CPU bound by compression before becoming I/O bound.
The largest changes that needed to be made to make this code work are as follows:
gzip.compress(bytes(string, 'utf-8'),compresslevel=9)
was needed to compress an individual "block" or "stream:file = open(outfile, 'wb')
was needed in order to open an unencoded binary output file that can become the target gzip.file.write()
action had to take place from a single process as it must be executed serially.It is worth noting that this does not write to the file in parallel, but rather handles the compression in parallel. The compression is the heavy-lifting part of this process anyway.
Updated code (tested and works as-is):
#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue
def reader(infile, data_queue, coordinator_queue, chunk_size):
print("Reader Started.")
while True:
data_chunk = list(islice(infile, chunk_size))
data_queue.put(data_chunk)
coordinator_queue.put('CHUNK_READ')
if not data_chunk:
coordinator_queue.put('READ_DONE')
#Process exit
break
def compressor(data_queue, compressed_queue, coordinator_queue):
print("Compressor Started.")
while True:
chunk = ''
queue_message = data_queue.get()
if (queue_message == 'DONE'):
#Notify coordinator process of task completion
coordinator_queue.put('COMPRESS_DONE')
#Process exit
break
else:
for line in queue_message:
#Assemble concatenated string from list
chunk += line
#Encode the string as binary so that it can be compressed
#Setting gzip compression level to 9 (highest)
compressed_chunk=gzip.compress(bytes(chunk,'utf-8'),compresslevel=9)
compressed_queue.put(compressed_chunk)
def writer(outfile, compressed_queue, coordinator_queue):
print("Writer Started.")
while True:
queue_message = compressed_queue.get()
if (queue_message == 'DONE'):
#Notify coordinator process of task completion
coordinator_queue.put('WRITE_DONE')
#Process exit
break
else:
outfile.write(queue_message)
def coordinator(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue):
print("Coordinator Started.")
active_readers=reader_procs
active_compressors=compressor_procs
active_writers=writer_procs
while True:
queue_message = coordinator_queue.get()
if queue_message=='READ_DONE':
active_readers = active_readers-1
if active_readers == 0:
while not data_queue.qsize() == 0:
continue
[data_queue.put('DONE') for x in range(compressor_procs)]
if queue_message=='COMPRESS_DONE':
active_compressors = active_compressors-1
if active_compressors == 0:
while not compressed_queue.qsize() == 0:
continue
[compressed_queue.put('DONE') for x in range(writer_procs)]
if queue_message=='WRITE_DONE':
active_writers = active_writers-1
if active_writers == 0:
break
def main():
reader_procs=1
compressor_procs=2
#writer_procs really needs to stay as 1 since writing must be done serially
#This could probably be written out...
writer_procs=1
chunk_size=600
queue_size=96
data_queue = Queue(queue_size)
compressed_queue=Queue(queue_size)
coordinator_queue=Queue()
infile_path='/directory/input_records.json.gz'
infile = gzip.open(infile_path, 'rt')
outfile_path='/directory/output_records.json.gz'
outfile=open(outfile_path, 'wb')
readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
compressors = [Process(target=compressor, args=(data_queue, compressed_queue, coordinator_queue)) for x in range(compressor_procs)]
writers = [Process(target=writer, args=(outfile, compressed_queue, coordinator_queue)) for x in range(writer_procs)]
coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue))
coordinator_p.start()
for process in readers:
process.start()
for process in compressors:
process.start()
for process in writers:
process.start()
for process in compressors:
process.join()
for process in readers:
process.join()
for process in writers:
process.join()
coordinator_p.join()
outfile.flush()
outfile.close()
main()
Upvotes: 1
Views: 1949
Reputation: 112339
It's actually quite straightforward to do by writing complete gzip streams from each thread to a single output file. Yes, you will need one thread that does all the writing, with each compression thread taking turns writing all of its gzip stream, before another compression thread gets to write any. The compression threads can all do their compression in parallel, but the writing needs to be serialized.
The reason this works is that the gzip standard, RFC 1952, says that a gzip files consists of a series of members, where each member is a gzip header, compressed data, and gzip trailer.
Upvotes: 7