Aljen15
Aljen15

Reputation: 107

How to extract .zst files into a pandas dataframe

I'm a bit of a beginner when it comes to Python, but one of my projects from school needs me to perform classification algorithms on this reddit popularity dataset. The files are huge .zst files and can be found here: https://files.pushshift.io/reddit/submissions/ Anyway, I'm just not sure how to extract this onto a database, as the assignments we've had so far just used .csv datasets which I could easily put into a pandas dataframe. I stumbled upon a different post and I tried using the code:

    def transform_zst_file(self,infile):
        zst_num_bytes = 2**22
        lines_read = 0
        dctx = zstd.ZstdDecompressor()
        with dctx.stream_reader(infile) as reader:
            previous_line = ""
            while True:
                chunk = reader.read(zst_num_bytes)
                if not chunk:
                    break
                string_data = chunk.decode('utf-8')
                lines = string_data.split("\n")
                for i, line in enumerate(lines[:-1]):
                    if i == 0:
                        line = previous_line + line
                    self.appendData(line, self.type)
                    lines_read += 1
                    if self.max_lines_to_read and lines_read >= self.max_lines_to_read:
                        return
                previous_line = lines[-1]

But I am not entirely sure how to put this into a pandas dataframe, or put only a certain percentage of datapoints into the dataframe if the file is too big. Any help would be very appreciated!

The following code only crashes my computer every time i try to run it:

import zstandard as zstd  
your_filename = "..." 
with open(your_filename, "rb") as f:     
    data = f.read()  

dctx = zstd.ZstdDecompressor() 
decompressed = dctx.decompress(data)

Might be due to the size of the file being too big, is there anyway to extract just a percentage of this file into the pandas dataframe?

Upvotes: 10

Views: 21347

Answers (6)

Jaakkonen
Jaakkonen

Reputation: 651

From version 1.4 onwards Pandas can decompress Zstandard (.zst), if you install the zstandard package. Before that there was native support for ’.gz’, ‘.bz2’, ‘.zip’ and ‘.xz’ compressions.

If the file ends with .zst suffix pandas by default infers the compression and can read in the file.

import pandas
df = pandas.read_csv('my_file.csv.zst')
# Being equivalent to
#   df = pandas.read_csv('my_file.csv.zst', compression='zstd')
# for files ending with .zst

See more in Pandas read_csv documentation.

Upvotes: 6

Pedro Lobito
Pedro Lobito

Reputation: 99001

There may be easier ways to achieve this, but to convert a zst from Reddit Dataset dumps to a valid json file using python, I end up using:

import zstandard as zstd

zst = '/path/to/file.zst'
with open(zst, "rb") as f:
    data = f.read()

dctx = zstd.ZstdDecompressor()
decompressed = dctx.decompress(data, max_output_size=1000000000) # 1GB
with open("/path/to/file.json", "w+") as f:
    f.write("[" + decompressed.decode("utf-8").strip().replace("\n", ",") + "]" )

Read the json file:

import json

with open("/path/to/file.json") as f:
    data = json.load(f)
for d in data:
    print(d)

And there's always a bash script to the rescue, which seems easier (remember to install zstd and jq):

set -euxo pipefail
cat "/path/to/file.zst" | zstd -d | jq --compact-output '.created_utc = (.created_utc | tonumber)' > "/path/to/file.json"

Upvotes: 0

plunker
plunker

Reputation: 1387

Unlike Bimba's answer, this doesn't read everything into memory while it operates over each line. This is useful if you are operating on compressed new-line delimited data which is larger than available memory.

import io
import zstandard as zstd
from pathlib import Path
import json

DCTX = zstd.ZstdDecompressor(max_window_size=2**31)

def read_lines_from_zst_file(zstd_file_path:Path):
    with (
        zstd.open(zstd_file_path, mode='rb', dctx=DCTX) as zfh,
        io.TextIOWrapper(zfh) as iofh
    ):
        for line in iofh:
            yield line       

if __name__ == "__main__":
    file = Path('some_zstd_file.zst')
    records = map(json.loads, read_lines_from_zst_file(file))
    for record in records:
        print(record.get('some-field'))
        
        

Upvotes: 4

Shahnawaz Akhtar
Shahnawaz Akhtar

Reputation: 509

I used the TextIOWrapper from io module.

with open(file_name, 'rb') as fh:
    dctx = zstandard.ZstdDecompressor(max_window_size=2147483648)
    stream_reader = dctx.stream_reader(fh)
    text_stream = io.TextIOWrapper(stream_reader, encoding='utf-8')
    for line in text_stream:
        obj = json.loads(line)
        # HANDLE OBJECT LOGIC HERE

Upvotes: 4

aryashah2k
aryashah2k

Reputation: 538

I stumbled across a similar Reddit Dataset consisting of zst dumps. In order to iterate over the content of your zst file, I used the following code which you could run as a script:

import zstandard
import os
import json
import sys
from datetime import datetime
import logging.handlers


log = logging.getLogger("bot")
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())


def read_lines_zst(file_name):
    with open(file_name, 'rb') as file_handle:
        buffer = ''
        reader = zstandard.ZstdDecompressor(max_window_size=2**31).stream_reader(file_handle)
        while True:
            chunk = reader.read(2**27).decode()
            if not chunk:
                break
            lines = (buffer + chunk).split("\n")

            for line in lines[:-1]:
                yield line, file_handle.tell()

            buffer = lines[-1]
        reader.close()


if __name__ == "__main__":
    file_path = sys.argv[1]
    file_size = os.stat(file_path).st_size
    file_lines = 0
    file_bytes_processed = 0
    created = None
    field = "subreddit"
    value = "wallstreetbets"
    bad_lines = 0
    try:
        for line, file_bytes_processed in read_lines_zst(file_path):
            try:
                obj = json.loads(line)
                created = datetime.utcfromtimestamp(int(obj['created_utc']))
                temp = obj[field] == value
            except (KeyError, json.JSONDecodeError) as err:
                bad_lines += 1
            file_lines += 1
            if file_lines % 100000 == 0:
                log.info(f"{created.strftime('%Y-%m-%d %H:%M:%S')} : {file_lines:,} : {bad_lines:,} : {(file_bytes_processed / file_size) * 100:.0f}%")
    except Exception as err:
        log.info(err)

    log.info(f"Complete : {file_lines:,} : {bad_lines:,}")

Upvotes: 2

Bimba Shrestha
Bimba Shrestha

Reputation: 331

The file has been compressed using Zstandard (https://github.com/facebook/zstd), a compression library.

The easiest thing to do for you will probably be to install python-zstandard (https://pypi.org/project/zstandard/) using

pip install zstandard

and then in a python script run something like

import zstandard as zstd

your_filename = "..."
with open(your_filename, "rb") as f:
    data = f.read()

dctx = zstd.ZstdDecompressor()
decompressed = dctx.decompress(data)

Now you can either use the decompressed data directly or write it to some file and then load it to pandas. Good luck!

Upvotes: 10

Related Questions