Reputation: 1779
I have few datasets in the form of numpy.arrays, vectors and dictionaries. Let's call them Di, i = 1..4. Other than these, I have a csv file F1.csv that has only one column.
I have written a python code P that will read rows from F1.csv. For each row, using read operations on Di, it will generate some data that has to be written on F2.csv. This code is working fine and giving expected results.
However, Di are huge and P is using only one CPU core. How can I make P use two cores -- one core for the first half of F1.csv and other for the second half?
My code is too complex to be written here, therefore I am giving a toy version:
# Code for generating D1
...
# Code for generating D2
...
# Code for generating D3
...
# Code for generating D4
...
# P starts
F1 = csv.reader(open('data/F1.csv'), delimiter='\t')
F2 = open('data/F2.csv', 'wb')
for row in F1:
toBeWritten = { ... some read operations on Di ... } #detailed code is given below in Edit 2
F2.write(toBeWritten)
# P ends
How can I modify the code between "# P starts" and "# P ends" so that the threads read disjoint rows from F1.csv, calculate toBeWritten independently, and then write to F2.csv?
I am new to Python multi-threading, so an answer that simply modifies my code to accomplish the task will be highly appreciated.
Edit 1:
Please note that the bottleneck is generating the toBeWritten corresponding to each row of F2. D1 is a 1.5M x 1.3M sparse matrix, read as scipy.sparse.lil_matrix. Generating toBeWritten involves adding some rows of this matrix. This addition is the real culprit!
Edit 2:
Actual code for generating toBeWritten is now included in the code below.
# D1 is a 1.5M x 1.3M sparse matrix, read as scipy.sparse.lil_matrix.
# D2 is a 1.5M x 111 matrix, read as numpy.array
# for row in F1:
user_id = row[0]
clust = D2[user_id, 110]
neighbors = D2[ D2[:, 110] == clust][:,1]
score = np.zeros(1300000)
for neigh in neighbors:
score = score + D1 [neigh, :] # the most expensive operation
toBeWritten = np.argsort(score)[:,::-1].A[0,:]
Upvotes: 1
Views: 180
Reputation: 905
Not sure if threads can help you with your specific problem but here you go:
# Code for generating D1
...
# Code for generating D2
...
# Code for generating D3
...
# Code for generating D4
...
# P starts
with open('data/F1.csv', 'rb') as csv_file, open('data/F2.csv', 'wb') as F2:
F1 = csv.reader(csv_file, delimiter='\t')
result = list()
def do_work(lines):
for line in lines:
toBeWritten = { ... some read operations on Di ... }
result.append(toBeWritten)
data = list(F1)
t0 = threading.Thread(target=do_work, args=(data[:len(data)/2], ))
t1 = threading.Thread(target=do_work, args=(data[len(data)/2:], ))
t0.start()
t1.start()
t0.join()
t1.join()
for line in result:
F2.write(line)
# P ends
You might want to try multiprocessing if threads don't help.
Upvotes: 1