Liang Shuyuan
Liang Shuyuan

Reputation: 25

What is the best way to extract information from multiple gz file into single csv file?

I am working with openalex data, which are multiple gz files. I need to translate json-like data into csv format.

For example: one line of the original file is like:

{"id": "https://openalex.org/A4380414832", "orcid": null, "display_name": "Tazi, Faiza", "display_name_alternatives": [], "updated": "2023-06-13"}

and I need to flatten it like:

id,orcid,display_name,display_name_alternatives,works_count,cited_by_count,last_known_institution,works_api_url,updated_date
https://openalex.org/A5110860965,,I.L. Chaikoff,"[""I Chaikoff"", ""I.L Chaikoff"", ""I. L CHAIKOFF"", ""I L. CHAIKOFF"", ""I. L. 

Please ignore any inconsistencies in the sample data.

So in the document, openalex use a simple loop to extract all the gz files into single csv file, which is useful, but too slow for files of tens of Gb.

I tried to accelerate the flatten operating by multiprocessing. Given the slowest part is to loop every line of the files, so I use producer-consumer model, one process for reading, two processes for loop and three processes for writing(beacuse I need three files to record different parts). And my code is like:

def reader(file_paths, data_queues, coordinator_queue, chunk_size):
    queue_index = 0  

    for file_path in file_paths:
        with gzip.open(file_path, "rt", encoding='utf-8') as infile:
            while True:
                data_chunk = list(islice(infile, chunk_size))
                if not data_chunk:
                    break  

                data_queues[queue_index].put(data_chunk)

                queue_index = (queue_index + 1) % len(data_queues)

        for data_queue in data_queues:
            data_queue.put('DONE')
        coordinator_queue.put('READ_DONE')


def filter(data_queue, authors_queue, authors_ids_queue, counts_queue, coordinator_queue):
    while True:
        data_chunk = data_queue.get()
        if data_chunk == 'DONE':
            coordinator_queue.put('FILTER_DONE')
            break
        
        authors = []
        authors_ids = []
        counts = []
        for line in data_chunk:
            # processing with data
        authors_queue.put(authors)
        authors_ids_queue.put(authors_ids)
        counts_queue.put(counts)


def write_to_gz(queue, file_path, columns, coordinator_queue):
    with gzip.open(file_path, 'wt', encoding='utf-8') as outfile:
        writer = csv.DictWriter(outfile, fieldnames=columns)
        writer.writeheader()

        while True:
            data = queue.get()
            if data == 'DONE':
                coordinator_queue.put('WRITE_DONE')
                break
            writer.writerows(data)

def coordinator(coordinator_queue, authors_queue, authors_ids_queue, counts_queue):
    active_readers = 1
    active_filters = 2
    active_writers = 3

    while True:
        queue_message = coordinator_queue.get()
        if queue_message == 'READ_DONE':
            active_readers -= 1
            if active_readers == 0:
                print("reader done")
        elif queue_message == 'FILTER_DONE':
            active_filters -= 1

            if active_filters == 0:
                while not authors_queue.qsize() == 0:
                    continue
                authors_queue.put('DONE')
                authors_ids_queue.put('DONE')
                counts_queue.put('DONE')
                print("All filters done.")
        elif queue_message == 'WRITE_DONE':
            active_writers -= 1
            print(f"Writer done. Active writers: {active_writers}")
            if active_writers == 0:
                print("All writers done.")
                break

You can see the complete code from github page.

The question is this program perform well for small file, but took 4 hours to complete the whole data, which is even slower than the single process loop.

According to my observation, the program can run efficiently in the first half hour, the CPU usage remains high, and the RAM continues to rise until it is full, but then the CPU usage of each process decreases, and so does the RAM.

In order to keep the process safety of writing files, it is hard to use process pool.

So what is the best way to handle this problem?

Upvotes: 0

Views: 34

Answers (0)

Related Questions