Reputation: 45
I need to run a parser on .gz
files that contain log files for a project I am working on. Once extracted, each log file is roughly 800MB
, and each .zip
file can contain up to 20 of them.
In total, I would need to parse through as much as 20GB
of raw text files in a single shot. I have no control over the structure of the log
/.gz
files as these are downloaded from the company's AWS server.
Within these log files, I need to look for a particular code, and if the code exists within the line, I need to extract the relevant data and save it to a csv
file.
Right now, I am searching through line by line and as expected, a
single file takes as long as 10min to complete, timed using timeit
with gzip.open(file_location, 'rb') as f:
for line in f:
line_string = line.decode().strip()
if self.config_dict["log_type"] in line_string:
log.append(line_string)
Is there any way I can speed up the parsing?
Edit: To give context, this is how a single line of the log file may look like
8=FIX.4.49=28935=834=109049=TESTSELL152=20180920-18:23:53.67156=TESTBUY16=113.3511=63673064027889863414=3500.000000000015=USD17=2063673064633531000021=231=113.3532=350037=2063673064633531000038=700039=140=154=155=MSFT60=20180920-18:23:53.531150=F151=3500453=1448=BRK2447=D452=110=151
Within this, I am checking for a very specific substring, lets say "155=MSFT" and if there is a match, i will add it to a certain list.
Upvotes: 1
Views: 806
Reputation: 224913
I would outsource the work to something faster than Python. zgrep(1) exists exactly for this task:
import subprocess
search_process = subprocess.Popen(
["zgrep", "-F", "--", self.config_dict["log_type"], file_location],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
encoding="utf-8",
)
log.extend(search_process.stdout)
if search_process.wait() != 0:
raise Exception(f"search process failed with code {search_process.returncode}")
Upvotes: 1
Reputation: 43840
One example of processing multiple files with multiprocessing Pool
from multiprocessing import Pool
def process_log(filepath: str, log_type: str) -> list[str]:
results = []
with gzip.open(file_location, 'r') as f:
for line in f:
if log_type in line:
log.append(line.line_stringstrip())
return results
def process_log_files(log_filepaths: list[str], log_type: str):
args = [(filepath, log_type) for filepath in log_filepaths]
with Pool() as pool:
with open('output.txt', 'w', encoding='utf8') as out:
for results in pool.starmap(process_log, args):
for result in results:
out.write(result)
Upvotes: 0