MALAM
MALAM

Reputation: 37

How to split big 30GB bz2 file into multiple small bz2 files and add a header to each

I have large number of bz2 formatted files (30GB each) without any header. I can split them easily in 500M in each size with the following pileline.

bzcat logging.abc_gps.bz2 | pv | split -b 500M -d -a 4 --filter='bzip > $FILE.csv.bz2' - splitted_file-

But I cannot add the header ['a' 'b' 'c' 'd' 'e' 'f' 'timestamp'] that I want to include each of the splitted bz2 file.

More importantly I want to split the file not based on the 500M, rather I want to split the bz2 file per day (example: splitted_file_2021-01-01.csv.bz2 and splitted_file_2021-01-02.csv.bz2)based on the content of timestamp in the data.

Data is tab-delimited text, like below (no header, need to add them):

19252547212 1   3041    2   1   74.18   1.8504  2021-05-01 00:00:00
19252547213 1   5055    2   1   0       0       2021-05-01 00:00:00
19252547214 1   5073    1   1   53.81   0.1836  2021-05-01 00:00:00

Upvotes: 0

Views: 953

Answers (1)

tdelaney
tdelaney

Reputation: 77337

You can use the bz2 package to open BZ2 encoded files and treat them as regular file objects. There is a minor performance advantage to read / write in binary. Assuming your data is either ASCII or UTF-8 and no tab characters need to be escaped in the data, you can just read the file line by line, opening and writing the outputs as new timestamps appear.

import bz2
import os

outfile = None
date = b""

with bz2.open("file") as fileobj:
    for line in filobj:
        # get date from, ex. "2021-05-01 00:00:00", timestamp
        new_date = line.split(b"\t")[7].split(b" ")[0]
        # roll to new file as needed, appending, so existing data not overwritten
        if new_date != date:
            date = new_date
            new_file = f"splitted_file_{new_date}.csv.bz2"
            exists = os.path.exists(new_file)
            outfile = bz2.open(new_file, "ab")
            if not exists:
                outfile.write(b"\t".join([b'a', b'b', b'c', b'd', b'e', b'f', b'timestamp']) + b"\n")
        # write the row
        outfile.writeline(line)
if outfile:
    outfile.close()

You may be able to speed this up with a pipeline. Give both the decryption and encryption to separate bzip2 processes that will run in parallel on different cores. Instead of a shell pipeline, you can create pipes and files to do it in the script itself. Assuming bzip2 exists on your system you could do the following. I added the tqdm module to print progress along the way.

#!/usr/bin/env python3

import subprocess as subp
from pathlib import Path
import sys
import tqdm

# TODO: Better command line
try:
    in_file_name = Path(sys.argv[1])
except IndexError:
    print("usage: unbzcsv.py filename")
    exit(1)

# build the format string used for generating output file names
out_file_name_fmt = "{}-{{}}.{}".format(*in_file_name.name.split(".", maxsplit=1))
out_file = None
date = b""
bzwriter = None
bzfile = None

# run bzip2 to decompress to stdout
bzreader = subp.Popen(["bzip2", "--decompress", "--stdout", in_file_name], 
        stdin=subp.DEVNULL, stdout=subp.PIPE)

# use tqdm to display progress as line count
progress = tqdm.tqdm(bzreader.stdout, desc="Lines", unit=" lines", unit_scale=True)

# read lines and fan out to files
try:
    for line in progress:
        # get date from, ex. "2021-05-01 00:00:00", timestamp
        new_date = line.split(b"\t")[7].split(b" ")[0]
        # roll to new file as needed, appending, so existing data not overwritten
        if new_date != date:
            date = new_date
            out_file_name = out_file_name_fmt.format(date.decode("utf-8"))
            if bzwriter is not None:
                bzwriter.stdin.close()
                bzwriter.wait()
                bzwriter = None
                bzfile.close()
            print("\nwriting", out_file_name)
            progress.refresh()
            bzfile = open(out_file_name, "wb")
            bzwriter = subp.Popen(["bzip2", "--compress"],
                    stdin=subp.PIPE, stdout=bzfile)
        # write the row
        bzwriter.stdin.write(line)
finally:
    bzreader.terminate() # in case of error
    if bzwriter:
        bzwriter.stdin.close()
        bzwriter.wait()
        bzfile.close()
    bzreader.wait()

Upvotes: 1

Related Questions