user3249433
user3249433

Reputation: 601

Processing huge CSV file using Python and multithreading

I have a function that yields lines from a huge CSV file lazily:

def get_next_line():
    with open(sample_csv,'r') as f:
        for line in f:
            yield line

def do_long_operation(row):
    print('Do some operation that takes a long time')

I need to use threads such that each record I get from the above function I can call do_long_operation.

Most places on Internet have examples like this, and I am not very sure if I am on the right path.

import threading
thread_list = []
for i in range(8):
   t = threading.Thread(target=do_long_operation, args=(get_next_row from get_next_line))
   thread_list.append(t)

for thread in thread_list:
    thread.start()

for thread in thread_list:
    thread.join()

My questions are:

  1. How do I start only a finite number of threads, say 8?

  2. How do I make sure that each of the threads will get a row from get_next_line?

Upvotes: 4

Views: 15342

Answers (3)

Hannu
Hannu

Reputation: 12205

You could use a thread pool from multiprocessing and map your tasks to a pool of workers:

from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool
from random import randint
from time import sleep


def process_line(l):
    print l, "started"
    sleep(randint(0, 3))
    print l, "done"


def get_next_line():
    with open("sample.csv", 'r') as f:
        for line in f:
            yield line

f = get_next_line()

t = Pool(processes=8)

for i in f:
    t.map(process_line, (i,))
t.close()
t.join()

This will create eight workers and submit your lines to them, one by one. As soon as a process is "free", it will be allocated a new task.

There is a commented out import statement, too. If you comment out the ThreadPool and import Pool from multiprocessing instead, you will get subprocesses instead of threads, which may be more efficient in your case.

Upvotes: 12

KB5
KB5

Reputation: 21

Using a Pool/ThreadPool from multiprocessing to map tasks to a pool of workers and a Queue to control how many tasks are held in memory (so we don't read too far ahead into the huge CSV file if worker processes are slow):

from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool
from random import randint
import time, os
from multiprocessing import Queue


def process_line(l):
    print("{} started".format(l))
    time.sleep(randint(0, 3))
    print("{} done".format(l))


def get_next_line():
    with open(sample_csv, 'r') as f:
        for line in f:
            yield line

# use for testing
# def get_next_line():
#     for i in range(100):
#         print('yielding {}'.format(i))
#         yield i


def worker_main(queue):
    print("{} working".format(os.getpid()))
    while True:
        # Get item from queue, block until one is available
        item = queue.get(True)
        if item == None:
            # Shutdown this worker and requeue the item so other workers can shutdown as well
            queue.put(None)
            break
        else:
            # Process item
            process_line(item)
    print("{} done working".format(os.getpid()))


f = get_next_line()

# Use a multiprocessing queue with maxsize
q = Queue(maxsize=5)

# Start workers to process queue items
t = Pool(processes=8, initializer=worker_main, initargs=(q,))

# Enqueue items. This blocks if the queue is full.
for l in f:
    q.put(l)

# Enqueue the shutdown message (i.e. None)
q.put(None)

# We need to first close the pool before joining
t.close()
t.join()

Upvotes: 2

hasherBaba
hasherBaba

Reputation: 339

Hannu's answer is not the best method. I ran the code on a 100M rows CSV file. It took me forever to perform the operation.

However, prior to reading his answer, I had written the following code:

def call_processing_rows_pickably(row):
    process_row(row)

import csv
from multiprocessing import Pool
import time
import datetime

def process_row(row):
    row_to_be_printed = str(row)+str("hola!")
    print(row_to_be_printed)

class process_csv():

    def __init__(self, file_name):
        self.file_name = file_name

    def get_row_count(self):
        with open(self.file_name) as f:
            for i, l in enumerate(f):
                pass
        self.row_count = i

    def select_chunk_size(self):
        if(self.row_count>10000000):
            self.chunk_size = 100000
            return
        if(self.row_count>5000000):
            self.chunk_size = 50000
            return
        self.chunk_size = 10000
        return

    def process_rows(self):
        list_de_rows = []
        count = 0
        with open(self.file_name, 'rb') as file:
            reader = csv.reader(file)
            for row in reader:
                print(count+1)
                list_de_rows.append(row)
                if(len(list_de_rows) == self.chunk_size):
                    p.map(call_processing_rows_pickably, list_de_rows)
                    del list_de_rows[:]

    def start_process(self):
        self.get_row_count()
        self.select_chunk_size()
        self.process_rows()

initial = datetime.datetime.now()
p = Pool(4)
ob = process_csv("100M_primes.csv")
ob.start_process()
final = datetime.datetime.now()
print(final-initial)

This took 22 minutes. Obviously, I need to have more improvements. For example, the Fred library in R takes 10 minutes maximum to do this task.

The difference is: I am creating a chunk of 100k rows first, and then I pass it to a function which is mapped by threadpool(here, 4 threads).

Upvotes: -1

Related Questions