Reputation: 1428
I use multithreading to insert data to database, but it cannot return correct result, the code is in belows:
class MongoInsertThread(threading.Thread):
def __init__(self, queue, thread_id):
super(MongoInsertThread, self).__init__()
self.thread_id = thread_id
self.queue = queue
def run(self):
print(self.thread_id,': ', self.queue.get())
def save_to_mongo_with_thread():
q = queue.Queue()
for e in range(3):
for i in range(10):
q.put([i], block=False)
threads = []
for i in range(5): ##(1)
threads.append(MongoInsertThread(q, i))
for t in threads:
t.start()
for t in threads:
t.join()
print("+++++++++++++++++++++++")
but the result generated by the code is:
0 : [0]
1 : [1]
2 : [2]
3 : [3]
4 : [4]
+++++++++++++++++++++++
0 : [5]
1 : [6]
2 : [7]
3 : [8]
4 : [9]
+++++++++++++++++++++++
0 : [0]
1 : [1]
2 : [2]
3 : [3]
4 : [4]
+++++++++++++++++++++++
which is not I wanted, and I hope the result is:
0 : [0]
1 : [1]
2 : [2]
3 : [3]
4 : [4]
0 : [5]
1 : [6]
2 : [7]
3 : [8]
4 : [9]
+++++++++++++++++++++++
0 : [0]
1 : [1]
2 : [2]
3 : [3]
4 : [4]
0 : [5]
1 : [6]
2 : [7]
3 : [8]
4 : [9]
+++++++++++++++++++++++
0 : [0]
1 : [1]
2 : [2]
3 : [3]
4 : [4]
0 : [5]
1 : [6]
2 : [7]
3 : [8]
4 : [9]
+++++++++++++++++++++++
Maybe in the for
loop went some wrong, but I cannot find any solutions to deal with it. where is wrong with my code? And how can I deal with it?
And I use 11
to substitute 5
in ##(1)
, and it suspended, how can I deal with it?
Upvotes: 2
Views: 147
Reputation: 4276
How about using thread-pool? Here is my approach:
MongoInsertThread.run()
method to make it run forever
until it sees some return
point (a None
job, e.g.).MongoInsertThread
threads.save_to_mongo_with_threads
: start the thread-pool -> put jobs to queue -> stop the thread-pool.Updated: Explain a bit more about thread-pool in this solution
MongoInsertThread.run()
method):
None
-> break the forever
loop (i.e. stop current thread)None
) -> processes the job.Code:
import threading
import queue
import time
class MongoInsertThread(threading.Thread):
def __init__(self, queue, thread_id):
super(MongoInsertThread, self).__init__()
self.thread_id = thread_id
self.queue = queue
def run(self):
while True:
job = self.queue.get()
if job is None:
return
print(self.thread_id, ": ", job)
class ThreadPool:
def __init__(self, queue, thread_count):
self._queue = queue
self._thread_count = thread_count
self._workers = []
for thread_id in range(thread_count):
worker_thrd = MongoInsertThread(queue, thread_id)
self._workers.append(worker_thrd)
def start(self):
for worker_thrd in self._workers:
worker_thrd.start()
def stop(self):
# put None job, each worker thread picks one then stops
for worker_thrd in self._workers:
self._queue.put(None)
# wait for worker threads
for worker_thrd in self._workers:
worker_thrd.join()
def save_to_mongo_with_threads():
q = queue.Queue()
pool = ThreadPool(q, 5)
pool.start()
time.sleep(1.0)
for e in range(3):
for i in range(10):
q.put([e, i])
print("+++++++++++++++++++++++")
time.sleep(1.0)
pool.stop()
save_to_mongo_with_threads()
Note: Jobs might be not evenly distributed among threads. One possible output:
+++++++++++++++++++++++
0 : [0, 0]
2 : [0, 1]
4 : [0, 3]
1 : [0, 4]
0 : [0, 5]
2 : [0, 6]
4 : [0, 7]
3 : [0, 2]
+++++++++++++++++++++++
2 : [0, 9]
0 : [0, 8]
+++++++++++++++++++++++
2 : [1, 2]
0 : [1, 3]
4 : [1, 0]
4 : [1, 7]
1 : [1, 4]
2 : [1, 5]
4 : [1, 8]
1 : [1, 9]
2 : [2, 0]
4 : [2, 1]
1 : [2, 2]
2 : [2, 3]
4 : [2, 4]
3 : [1, 1]
2 : [2, 6]
4 : [2, 7]
3 : [2, 8]
2 : [2, 9]
1 : [2, 5]
0 : [1, 6]
Upvotes: 3
Reputation:
The output you are seeing is a result of how you populate the queue
, consider what the block
for i in range(10):
q.put([i], block=False)
is doing; it is queuing 10
new values to q
, subsequently the block
for i in range(5):
threads.append(MongoInsertThread(q, i))
passes queue to 5
threads, each of which calls get()
once for a total of 5
calls. So at line 16 in the first iteration of e
, q
is comprised of
[0], [1], [2], [3], [4], [5], [6], [7], [8], [9]
After the threads complete it contains
[5], [6], [7], [8], [9]
After which 10
more values are queued, yielding
[5], [6], [7], [8], [9], [0], [1], [2], [3], [4], [5], [6], [7], [8], [9]
5
values are then again removed leaving
[0], [1], [2], [3], [4], [5], [6], [7], [8], [9]
After which another 10
values are queued,
[0], [1], [2], [3], [4], [5], [6], [7], [8], [9], [0], [1], [2], [3], [4], [5], [6], [7], [8], [9]
Followed by the last removal which leaves
[5], [6], [7], [8], [9], [0], [1], [2], [3], [4], [5], [6], [7], [8], [9]
If you want 10
elements removed every time you must have 10
calls to get()
, in the setup you have this requires 10
threads, or changing the thread loop to
for i in range(10):
threads.append(MongoInsertThread(q, i))
However this isn't necessarily the most efficient way to do this, since for 100
elements you must generate 100
threads. It is better to generate a smaller number of threads and have them each call get()
multiple times, like so
class MongoInsertThread(threading.Thread):
def __init__(self, queue, thread_id, m):
super(MongoInsertThread, self).__init__()
self.thread_id = thread_id
self.queue = queue
self.m = m
def run(self):
for i in range(self.m):
# Using sys,stdout.write keeps the output lines from getting garbled
sys.stdout.write(str(self.thread_id)+': '+str(self.queue.get())+"\n")
def save_to_mongo_with_thread():
q = Queue.Queue()
n1 = 11
n2 = 5
for e in range(3):
for i in range(n1):
q.put([i], block=False)
threads = []
# Caution - if n1 < n2 this will fail
for i in range(n2-1):
threads.append(MongoInsertThread(q, i, (n1/n2)))
threads.append(MongoInsertThread(q, n2-1, (n1/n2)+(n1 % n2))) # Handles (n1 % n2 != 0)
for t in threads:
t.start()
for t in threads:
t.join()
print("+++++++++++++++++++++++")
Sample output (using n1 = 11
not n1 = 10
)
0: [0]
0: [1]
1: [2]
1: [3]
2: [4]
2: [5]
3: [6]
3: [7]
4: [8]
4: [9]
4: [10]
+++++++++++++++++++++++
0: [0]
0: [1]
1: [2]
1: [3]
2: [4]
2: [5]
3: [6]
3: [7]
4: [8]
4: [9]
4: [10]
+++++++++++++++++++++++
0: [0]
0: [1]
1: [2]
2: [3]
1: [4]
2: [5]
3: [6]
3: [7]
4: [8]
4: [9]
4: [10]
+++++++++++++++++++++++
Upvotes: 1