Shrout1
Shrout1

Reputation: 2607

Python multiprocessing write to single gzip in parallel

I am trying to copy a large, compressed file (.gz) to another compressed file (.gz) using python. I will perform intermediate processing on the data that is not present in my code sample. I would like to be able to use multiprocessing with locks to write to the new gzip in parallel from multiple processes, but I get an invalid format error on the output gz file.

I assume that this is because a lock is not enough to support writing to a gzip in parallel. Since compressed data requires "knowledge" of the data that came before it in order to make correct entries into the archive I don't think that python can't handle this by default. I'd guess that each process maintains its own awareness of the gzip output and that this state diverges after the first write.

If I open the target file in the script without using gzip then this all works. I could also write to multiple gzips and merge them, but prefer to avoid that if possible.

Here is my source code:

#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue, Lock

def reader(infile, data_queue, coordinator_queue, chunk_size):
    print("Reader Started.")
    while True:
        data_chunk = list(islice(infile, chunk_size))
        data_queue.put(data_chunk)
        coordinator_queue.put('CHUNK_READ')
        if not data_chunk:
            coordinator_queue.put('READ_DONE')
            #Process exit
            break
        
def writer(outfile, data_queue, coordinator_queue, write_lock, ID):
    print("Writer Started.")
    while True:
        queue_message = data_queue.get()
        if (queue_message == 'DONE'):
            outfile.flush() 
            coordinator_queue.put('WRITE_DONE')
            #Process exit
            break
        else:
            print("Writer",ID,"-","Write Lock:",write_lock)
            write_lock.acquire()
            print("Writer",ID,"-","Write Lock:",write_lock)
            for line in queue_message:
                print("Line write:",line)
                outfile.write(line)
            write_lock.release()
            print("Writer",ID,"-","Write Lock:",write_lock)

def coordinator(reader_procs, writer_procs, coordinator_queue, data_queue):
    print("Coordinator Started.")
    active_readers=reader_procs
    active_writers=writer_procs
    while True:
        queue_message = coordinator_queue.get()
        if queue_message=='READ_DONE':
            active_readers = active_readers-1
            if active_readers == 0:
                while not data_queue.qsize() == 0:
                    continue
                [data_queue.put('DONE') for x in range(writer_procs)]
        if queue_message=='WRITE_DONE':
            active_writers = active_writers-1
            if active_writers == 0:
                break

def main():
    reader_procs=1
    writer_procs=2
    chunk_size=1
    queue_size=96
    data_queue = Queue(queue_size)
    coordinator_queue=Queue()
    write_lock=Lock()
    infile_path='/directory/input_records.json.gz'
    infile = gzip.open(infile_path, 'rt')
    outfile_path='/directory/output_records.json.gz'
    outfile = gzip.open(outfile_path, 'wt')
    #Works when it is uncompressed
    #outfile=open(outfile_path, 'w')
    readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
    writers = [Process(target=writer, args=(outfile, data_queue, coordinator_queue, write_lock, x)) for x in range(writer_procs)]   
    coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, coordinator_queue, data_queue))

    coordinator_p.start()
    for process in readers:
        process.start()
    for process in writers:
        process.start()
    for process in readers:
        process.join()
    for process in writers:
        process.join()
    coordinator_p.join()
    outfile.flush()
    outfile.close()

main()

Notes about the code:

I would guess that I need a library that can somehow coordinate the compressed writes between the different processes. Obviously this speaks to the use of a single process to perform the write (like the coordinator process) however that would likely introduce a bottleneck.

There are some related posts on stack but none that seem to specifically address what I am trying to do. I also see utilities like "mgzip", "pigz" and "migz" out there that can parallelize compression, but I don't believe they are applicable to this use case. mgzip didn't work in my testing (0 sized file), pigz appears to consume an entire file as input on the command line and migz is a java library, so I'm not sure how I could integrate it into python.

If it can't be done then so be it but any answer would be appreciated!

------- Update and working code:

With help from Mark Adler I was able to create a multiprocessing script that compresses the data in parallel and has a single writer process to add it to the target gz file. With the throughput on a modern NVME drive this reduces the likelihood of becoming CPU bound by compression before becoming I/O bound.

The largest changes that needed to be made to make this code work are as follows:

It is worth noting that this does not write to the file in parallel, but rather handles the compression in parallel. The compression is the heavy-lifting part of this process anyway.

Updated code (tested and works as-is):

#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue

def reader(infile, data_queue, coordinator_queue, chunk_size):
    print("Reader Started.")
    while True:
        data_chunk = list(islice(infile, chunk_size))
        data_queue.put(data_chunk)
        coordinator_queue.put('CHUNK_READ')
        if not data_chunk:
            coordinator_queue.put('READ_DONE')
            #Process exit
            break

def compressor(data_queue, compressed_queue, coordinator_queue):
    print("Compressor Started.")
    while True:
        chunk = ''
        queue_message = data_queue.get()
        if (queue_message == 'DONE'):
            #Notify coordinator process of task completion      
            coordinator_queue.put('COMPRESS_DONE')
            #Process exit
            break
        else:
            for line in queue_message:
                #Assemble concatenated string from list
                chunk += line
            #Encode the string as binary so that it can be compressed
            #Setting gzip compression level to 9 (highest)
            compressed_chunk=gzip.compress(bytes(chunk,'utf-8'),compresslevel=9)            
            compressed_queue.put(compressed_chunk)

def writer(outfile, compressed_queue, coordinator_queue):
    print("Writer Started.")
    while True:
        queue_message = compressed_queue.get()
        if (queue_message == 'DONE'):
            #Notify coordinator process of task completion      
            coordinator_queue.put('WRITE_DONE')
            #Process exit
            break
        else:
            outfile.write(queue_message)

def coordinator(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue):
    print("Coordinator Started.")
    active_readers=reader_procs
    active_compressors=compressor_procs
    active_writers=writer_procs
    while True:
        queue_message = coordinator_queue.get()
        if queue_message=='READ_DONE':
            active_readers = active_readers-1
            if active_readers == 0:
                while not data_queue.qsize() == 0:
                    continue
                [data_queue.put('DONE') for x in range(compressor_procs)]
        if queue_message=='COMPRESS_DONE':
            active_compressors = active_compressors-1
            if active_compressors == 0:
                while not compressed_queue.qsize() == 0:
                    continue
                [compressed_queue.put('DONE') for x in range(writer_procs)]
        if queue_message=='WRITE_DONE':
            active_writers = active_writers-1
            if active_writers == 0:
                break

def main():
    reader_procs=1
    compressor_procs=2
    #writer_procs really needs to stay as 1 since writing must be done serially
    #This could probably be written out...
    writer_procs=1
    chunk_size=600
    queue_size=96
    data_queue = Queue(queue_size)
    compressed_queue=Queue(queue_size)
    coordinator_queue=Queue()
    infile_path='/directory/input_records.json.gz'
    infile = gzip.open(infile_path, 'rt')
    outfile_path='/directory/output_records.json.gz'
    outfile=open(outfile_path, 'wb')
    readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
    compressors = [Process(target=compressor, args=(data_queue, compressed_queue, coordinator_queue)) for x in range(compressor_procs)]
    writers = [Process(target=writer, args=(outfile, compressed_queue, coordinator_queue)) for x in range(writer_procs)]
    coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue))
    coordinator_p.start()
    for process in readers:
        process.start()
    for process in compressors:
        process.start()     
    for process in writers:
        process.start()
    for process in compressors:
        process.join()
    for process in readers:
        process.join()
    for process in writers:
        process.join()
    coordinator_p.join()
    outfile.flush()
    outfile.close()

main()

Upvotes: 1

Views: 1949

Answers (1)

Mark Adler
Mark Adler

Reputation: 112339

It's actually quite straightforward to do by writing complete gzip streams from each thread to a single output file. Yes, you will need one thread that does all the writing, with each compression thread taking turns writing all of its gzip stream, before another compression thread gets to write any. The compression threads can all do their compression in parallel, but the writing needs to be serialized.

The reason this works is that the gzip standard, RFC 1952, says that a gzip files consists of a series of members, where each member is a gzip header, compressed data, and gzip trailer.

Upvotes: 7

Related Questions