martinsarif
martinsarif

Reputation: 105

Streaming JSON Objects From a Large Compressed File

I am working on a personal project which involves reading in large files of JSON objects, which consist of potentially millions of entries, which are compressed using GZip. The problem that I am having is in determining how to efficiently parse these objects line-by-line and store them in memory such that they do not use up all of the RAM on my system. It must be able to access or construct these objects at a later time for analysis. What I have attempted thus far is as follows

def parse_data(file):
   accounts = []
   with gzip.open(file, mode='rb') as accounts_data:
      for line in accounts_data:
         # if line is not empty
         if len(line,strip()) != 0:
            account = BytesIO(line)
            accounts.append(account)
   return accounts

def getaccounts(accounts, idx):
   account = json.load(accounts[idx])
   # creates account object using fields in account dict
   return account_from_dict(account)

A major problem with this implementation is that I am unable to access the same object in accounts twice without it resulting in a JSONDecodeError's being generated. I also am not sure whether or not this is the most compact way I could be doing this.

Any assistance would be much appreciated.

Edit: The format of the data stored in these files are as follows:

{JSON Object 1}
{JSON Object 2}
...
{JSON Object n}

Edit: It is my intention to use the information stored in these JSON account entries to form a graph of similarities or patterns in account information.

Upvotes: 1

Views: 2025

Answers (3)

martineau
martineau

Reputation: 123453

Here's how to randomly access JSON objects in the gzipped file by first uncompressing it into a temporary file and then using tell() and seek() to retrieve them by index — thus requiring only enough memory to hold the offsets of each one.

I'm posting this primarily because you asked me for an example of doing it in the comments...which I wouldn't have otherwise, because it not quite the same thing as streaming data. The major difference is that, unlike doing that, it gives access to all the data including being able to randomly access any of the objects at will.

Uncompressing the entire file first does introduce some additional overhead, so unless you need to be able to access the JSON object more than once, probably wouldn't be worth it. The implementation shown could probably be sped-up by caching previous loaded objects, but without knowing precisely what the access patterns will be, it hard to say for sure.

import collections.abc
import gzip
import json
import random
import tempfile


class GZ_JSON_Array(collections.abc.Sequence):
    """ Allows objects in gzipped file of JSON objects, one-per-line, to be
        treated as an immutable sequence of JSON objects.
    """
    def __init__(self, gzip_filename):
        self.tmpfile = tempfile.TemporaryFile('w+b')
        # Decompress a gzip file into a temp file and save offsets of the
        # start of each line in it.
        self.offsets = []
        with gzip.open(gzip_filename, mode='rb') as gzip_file:
            for line in gzip_file:
                line = line.rstrip().decode('utf-8')
                if line:
                    self.offsets.append(self.tmpfile.tell())
                    self.tmpfile.write(bytes(line + '\n', encoding='utf-8'))

    def __len__(self):
        return len(self.offsets)

    def __iter__(self):
        for index in range(len(self)):
            yield self[index]

    def __getitem__(self, index):
        """ Return a JSON object at offsets[index] in the given open file. """
        if index not in range(len(self.offsets)):
            raise IndexError
        self.tmpfile.seek(self.offsets[index])
        try:
            size = self.offsets[index+1] - self.offsets[index]  # Difference with next.
        except IndexError:
            size = -1  # Last one - read all remaining data.
        return json.loads(self.tmpfile.read(size).decode())

    def __del__(self):
        try:
            self.tmpfile.close()  # Allow it to auto-delete.
        except Exception:
            pass


if __name__ == '__main__':

    gzip_filename = 'json_objects.dat.gz'

    json_array = GZ_JSON_Array(gzip_filename)

    # Randomly access some objects in the JSON array.
    for index in random.sample(range(len(json_array)), 3):
        obj = json_array[index]
        print('object[{}]: {!r}'.format(index, obj))

Upvotes: 1

aghast
aghast

Reputation: 15310

Based on your answers in the comments, it seems like you just need to scan through the objects:

def evaluate_accounts(file):
    results = {}

    with gzip.open(file) as records:
        for json_rec in records:
            if json_rec.strip():
                account = json.loads(json_rec)
                results[account['id']] = evaluate_account(account)

    return results

Upvotes: 0

jspcal
jspcal

Reputation: 51904

Hhi, perhaps use an incremental json reader such as ijson. That does not require loading the entire structure into memory at once.

Upvotes: 0

Related Questions