Reputation: 105
I am working on a personal project which involves reading in large files of JSON objects, which consist of potentially millions of entries, which are compressed using GZip. The problem that I am having is in determining how to efficiently parse these objects line-by-line and store them in memory such that they do not use up all of the RAM on my system. It must be able to access or construct these objects at a later time for analysis. What I have attempted thus far is as follows
def parse_data(file):
accounts = []
with gzip.open(file, mode='rb') as accounts_data:
for line in accounts_data:
# if line is not empty
if len(line,strip()) != 0:
account = BytesIO(line)
accounts.append(account)
return accounts
def getaccounts(accounts, idx):
account = json.load(accounts[idx])
# creates account object using fields in account dict
return account_from_dict(account)
A major problem with this implementation is that I am unable to access the same object in accounts twice without it resulting in a JSONDecodeError's being generated. I also am not sure whether or not this is the most compact way I could be doing this.
Any assistance would be much appreciated.
Edit: The format of the data stored in these files are as follows:
{JSON Object 1}
{JSON Object 2}
...
{JSON Object n}
Edit: It is my intention to use the information stored in these JSON account entries to form a graph of similarities or patterns in account information.
Upvotes: 1
Views: 2025
Reputation: 123453
Here's how to randomly access JSON objects in the gzipped file by first uncompressing it into a temporary file and then using tell()
and seek()
to retrieve them by index — thus requiring only enough memory to hold the offsets of each one.
I'm posting this primarily because you asked me for an example of doing it in the comments...which I wouldn't have otherwise, because it not quite the same thing as streaming data. The major difference is that, unlike doing that, it gives access to all the data including being able to randomly access any of the objects at will.
Uncompressing the entire file first does introduce some additional overhead, so unless you need to be able to access the JSON object more than once, probably wouldn't be worth it. The implementation shown could probably be sped-up by caching previous loaded objects, but without knowing precisely what the access patterns will be, it hard to say for sure.
import collections.abc
import gzip
import json
import random
import tempfile
class GZ_JSON_Array(collections.abc.Sequence):
""" Allows objects in gzipped file of JSON objects, one-per-line, to be
treated as an immutable sequence of JSON objects.
"""
def __init__(self, gzip_filename):
self.tmpfile = tempfile.TemporaryFile('w+b')
# Decompress a gzip file into a temp file and save offsets of the
# start of each line in it.
self.offsets = []
with gzip.open(gzip_filename, mode='rb') as gzip_file:
for line in gzip_file:
line = line.rstrip().decode('utf-8')
if line:
self.offsets.append(self.tmpfile.tell())
self.tmpfile.write(bytes(line + '\n', encoding='utf-8'))
def __len__(self):
return len(self.offsets)
def __iter__(self):
for index in range(len(self)):
yield self[index]
def __getitem__(self, index):
""" Return a JSON object at offsets[index] in the given open file. """
if index not in range(len(self.offsets)):
raise IndexError
self.tmpfile.seek(self.offsets[index])
try:
size = self.offsets[index+1] - self.offsets[index] # Difference with next.
except IndexError:
size = -1 # Last one - read all remaining data.
return json.loads(self.tmpfile.read(size).decode())
def __del__(self):
try:
self.tmpfile.close() # Allow it to auto-delete.
except Exception:
pass
if __name__ == '__main__':
gzip_filename = 'json_objects.dat.gz'
json_array = GZ_JSON_Array(gzip_filename)
# Randomly access some objects in the JSON array.
for index in random.sample(range(len(json_array)), 3):
obj = json_array[index]
print('object[{}]: {!r}'.format(index, obj))
Upvotes: 1
Reputation: 15310
Based on your answers in the comments, it seems like you just need to scan through the objects:
def evaluate_accounts(file):
results = {}
with gzip.open(file) as records:
for json_rec in records:
if json_rec.strip():
account = json.loads(json_rec)
results[account['id']] = evaluate_account(account)
return results
Upvotes: 0