Slowpoke
Slowpoke

Reputation: 1079

Python: compress and save/load large data from/into memory

I have a huge dictionary with numpy arrays as values which consumes almost all RAM. There is no possibility to pickle or compress it entirely. I've checked some of solutions to read/write in chunks using zlib, but they work with files, StringIO, etc, when I want to read/write from/into RAM.

Here is the closest example to what I want, but it has only writing part. How can I read the object after saving this way, because chunks were written together and compressed chunks of course have different length?

import zlib


class ZlibWrapper():
    # chunksize is used to save memory, otherwise huge object will be copied
    def __init__(self, filename, chunksize=268435456): # 256 MB
        self.filename = filename
        self.chunksize = chunksize


    def save(self, data): 
        """Saves a compressed object to disk
        """
        mdata = memoryview(data)
        with open(self.filename, 'wb') as f:
          for i in range(0, len(mdata), self.chunksize):
             mychunk = zlib.compress(bytes(mdata[i:i+self.chunksize]))
             f.write(mychunk)

    def load(self):

        # ???

        return data

Uncompressed objects unfortunately would be too huge to be sent over network, and zipping them externally would create additional complications.

Pickle unfortunately starts to consume RAM and system hangs.

Following the discussion with Charles Duffy, here is my attempt of serialization (does not work at the moment - does not even compress the strings):

import zlib

import json

import numpy as np



mydict = {"a":np.array([1,2,3]),"b":np.array([4,5,6]),"c":np.array([0,0,0])}


#------------


# write to compressed stream ---------------------

def string_stream_serialization(dic):
    for key, val in dic.items():        
        #key_encoded = key.encode("utf-8")  # is not json serializable
        yield json.dumps([key,val.tolist()])


output = ""
compressor = zlib.compressobj()
decompressor = zlib.decompressobj()


stream = string_stream_serialization(mydict)

with open("outfile.compressed", "wb") as f:
    for s in stream:
        if not s:
            f.write(compressor.flush())
            break
        f.write(compressor.compress(s.encode('utf-8'))) # .encode('utf-8') converts to bytes




# read from compressed stream: --------------------

def read_in_chunks(file_object, chunk_size=1024): # I set another chunk size intentionally
    """Lazy function (generator) to read a file piece by piece.
    Default chunk size: 1k."""
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


reconstructed = {}

with open("outfile.compressed", "rb") as f:
    for s in read_in_chunks(f):
        data = decompressor.decompress(decompressor.unconsumed_tail + s)
        while data:
            arr = json.loads(data.decode("utf-8"))            
            reconstructed[arr[0]] = np.array(arr[1])
            data = decompressor.decompress(decompressor.unconsumed_tail)


print(reconstructed)

Upvotes: 2

Views: 7013

Answers (2)

napuzba
napuzba

Reputation: 6288

To write a dictionary to disk, the zipfile module is a good fit.

  • When saving - Save each chunk as a file in the zip.
  • When loading - Iterate over the files in the zip and rebuild the data.

Upvotes: 2

Charles Duffy
Charles Duffy

Reputation: 295281

Your first focus should be on having a sane way to serialize and deserialize your data. We have several constraints about your data provided in the question itself, or in comments on same:

  • Your data consists of a dictionary with a very large number of key/value pairs
  • All keys are unicode strings
  • All values are numpy arrays which are individually short enough to easily fit in memory at any given time (or even to allow multiple copies of any single value), although in aggregate the storage required becomes extremely large.

This suggests a fairly simple implementation:

def serialize(f, content):
    for k,v in content.items():
        # write length of key, followed by key as string
        k_bstr = k.encode('utf-8')
        f.write(struct.pack('L', len(k_bstr)))
        f.write(k_bstr)
        # write length of value, followed by value in numpy.save format
        memfile = io.BytesIO()
        numpy.save(memfile, v)
        f.write(struct.pack('L', memfile.tell()))
        f.write(memfile.getvalue())

def deserialize(f):
    retval = {}
    while True:
        content = f.read(struct.calcsize('L'))
        if not content: break
        k_len = struct.unpack('L', content)[0]
        k_bstr = f.read(k_len)
        k = k_bstr.decode('utf-8')
        v_len = struct.unpack('L', f.read(struct.calcsize('L')))[0]
        v_bytes = io.BytesIO(f.read(v_len))
        v = numpy.load(v_bytes)
        retval[k] = v
    return retval

As a simple test:

test_file = io.BytesIO()
serialize(test_file, {
    "First Key": numpy.array([123,234,345]),
    "Second Key": numpy.array([321,432,543]),
})

test_file.seek(0)
print(deserialize(test_file))

...so, we've got that -- now, how do we add compression? Easily.

with gzip.open('filename.gz', 'wb') as gzip_file:
    serialize(gzip_file, your_data)

...or, on the decompression side:

with gzip.open('filename.gz', 'rb') as gzip_file:
    your_data = deserialize(gzip_file)

This works because the gzip library already streams data out as it's requested, rather than compressing it or decompressing it all at once. There's no need to do windowing and chunking yourself -- just leave it to the lower layer.

Upvotes: 4

Related Questions