Sonu Mishra
Sonu Mishra

Reputation: 1779

Multithreading in Python script

I have few datasets in the form of numpy.arrays, vectors and dictionaries. Let's call them Di, i = 1..4. Other than these, I have a csv file F1.csv that has only one column.

I have written a python code P that will read rows from F1.csv. For each row, using read operations on Di, it will generate some data that has to be written on F2.csv. This code is working fine and giving expected results.

However, Di are huge and P is using only one CPU core. How can I make P use two cores -- one core for the first half of F1.csv and other for the second half?

My code is too complex to be written here, therefore I am giving a toy version:

# Code for generating D1
  ...
# Code for generating D2
  ...
# Code for generating D3
  ...
# Code for generating D4
  ...

# P starts

F1 = csv.reader(open('data/F1.csv'), delimiter='\t')
F2 = open('data/F2.csv', 'wb')

for row in F1:
    toBeWritten = { ... some read operations on Di ... } #detailed code is given below in Edit 2

    F2.write(toBeWritten)

# P ends

How can I modify the code between "# P starts" and "# P ends" so that the threads read disjoint rows from F1.csv, calculate toBeWritten independently, and then write to F2.csv?

I am new to Python multi-threading, so an answer that simply modifies my code to accomplish the task will be highly appreciated.

Edit 1:

Please note that the bottleneck is generating the toBeWritten corresponding to each row of F2. D1 is a 1.5M x 1.3M sparse matrix, read as scipy.sparse.lil_matrix. Generating toBeWritten involves adding some rows of this matrix. This addition is the real culprit!

Edit 2:

Actual code for generating toBeWritten is now included in the code below.

# D1 is a 1.5M x 1.3M sparse matrix, read as scipy.sparse.lil_matrix.
# D2 is a 1.5M x 111 matrix, read as numpy.array
# for row in F1:
    user_id = row[0]
    clust = D2[user_id, 110]
    neighbors = D2[ D2[:, 110] == clust][:,1]
    score = np.zeros(1300000)

    for neigh in neighbors:
        score = score + D1 [neigh, :] # the most expensive operation

    toBeWritten = np.argsort(score)[:,::-1].A[0,:]

Upvotes: 1

Views: 180

Answers (1)

Schore
Schore

Reputation: 905

Not sure if threads can help you with your specific problem but here you go:

# Code for generating D1
  ...
# Code for generating D2
  ...
# Code for generating D3
  ...
# Code for generating D4
  ...

# P starts

with open('data/F1.csv', 'rb') as csv_file, open('data/F2.csv', 'wb') as F2:
    F1 = csv.reader(csv_file, delimiter='\t')
    result = list()

    def do_work(lines):
        for line in lines:
            toBeWritten = { ... some read operations on Di ... }
            result.append(toBeWritten)

    data = list(F1)
    t0 = threading.Thread(target=do_work, args=(data[:len(data)/2], ))
    t1 = threading.Thread(target=do_work, args=(data[len(data)/2:], ))
    t0.start()
    t1.start()
    t0.join()
    t1.join()

    for line in result:
        F2.write(line)

# P ends

You might want to try multiprocessing if threads don't help.

Upvotes: 1

Related Questions