littlely
littlely

Reputation: 1428

python multithreading cannot output a complete result

I use multithreading to insert data to database, but it cannot return correct result, the code is in belows:

class MongoInsertThread(threading.Thread):
    def __init__(self,  queue, thread_id):
        super(MongoInsertThread, self).__init__()
        self.thread_id = thread_id
        self.queue = queue

    def run(self):
        print(self.thread_id,': ', self.queue.get())

def save_to_mongo_with_thread():
    q = queue.Queue()

    for e in range(3):
        for i in range(10):
            q.put([i], block=False)
        threads = []
        for i in range(5): ##(1)
            threads.append(MongoInsertThread(q, i))
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        print("+++++++++++++++++++++++")

but the result generated by the code is:

0 :  [0]
1 :  [1]
2 :  [2]
3 :  [3]
4 :  [4]
+++++++++++++++++++++++
0 :  [5]
1 :  [6]
2 :  [7]
3 :  [8]
4 :  [9]
+++++++++++++++++++++++
0 :  [0]
1 :  [1]
2 :  [2]
3 :  [3]
4 :  [4]
+++++++++++++++++++++++

which is not I wanted, and I hope the result is:

0 :  [0]
1 :  [1]
2 :  [2]
3 :  [3]
4 :  [4]
0 :  [5]
1 :  [6]
2 :  [7]
3 :  [8]
4 :  [9]
+++++++++++++++++++++++
0 :  [0]
1 :  [1]
2 :  [2]
3 :  [3]
4 :  [4]
0 :  [5]
1 :  [6]
2 :  [7]
3 :  [8]
4 :  [9]
+++++++++++++++++++++++
0 :  [0]
1 :  [1]
2 :  [2]
3 :  [3]
4 :  [4]
0 :  [5]
1 :  [6]
2 :  [7]
3 :  [8]
4 :  [9]
+++++++++++++++++++++++

Maybe in the for loop went some wrong, but I cannot find any solutions to deal with it. where is wrong with my code? And how can I deal with it? And I use 11 to substitute 5 in ##(1), and it suspended, how can I deal with it?

Upvotes: 2

Views: 147

Answers (2)

duong_dajgja
duong_dajgja

Reputation: 4276

How about using thread-pool? Here is my approach:

  • Change your MongoInsertThread.run() method to make it run forever until it sees some return point (a None job, e.g.).
  • Make a thread-pool MongoInsertThread threads.
  • Update save_to_mongo_with_threads: start the thread-pool -> put jobs to queue -> stop the thread-pool.

Updated: Explain a bit more about thread-pool in this solution

  • Thread-pool is a collection of multiple threads.
  • Threads within a thread-pool share the same queue of jobs.
  • Each thread runs forever (MongoInsertThread.run() method):
    • (1) gets a job from the shared queue.
    • (2) if the job is None -> break the forever loop (i.e. stop current thread)
    • (3) else (the job is not None) -> processes the job.
    • (4) go to (1).

Code:

import threading
import queue
import time


class MongoInsertThread(threading.Thread):
    def __init__(self, queue, thread_id):
        super(MongoInsertThread, self).__init__()
        self.thread_id = thread_id
        self.queue = queue

    def run(self):
        while True:
            job = self.queue.get()
            if job is None:
                return
            print(self.thread_id, ": ", job)


class ThreadPool:
    def __init__(self, queue, thread_count):
        self._queue = queue
        self._thread_count = thread_count
        self._workers = []
        for thread_id in range(thread_count):
            worker_thrd = MongoInsertThread(queue, thread_id)
            self._workers.append(worker_thrd)

    def start(self):
        for worker_thrd in self._workers:
            worker_thrd.start()

    def stop(self):
        # put None job, each worker thread picks one then stops
        for worker_thrd in self._workers:
            self._queue.put(None)
        # wait for worker threads
        for worker_thrd in self._workers:
            worker_thrd.join()


def save_to_mongo_with_threads():
    q = queue.Queue()

    pool = ThreadPool(q, 5)
    pool.start()

    time.sleep(1.0)

    for e in range(3):
        for i in range(10):
            q.put([e, i])
        print("+++++++++++++++++++++++")

    time.sleep(1.0)

    pool.stop()


save_to_mongo_with_threads()

Note: Jobs might be not evenly distributed among threads. One possible output:

+++++++++++++++++++++++
0 :  [0, 0]
2 :  [0, 1]
4 :  [0, 3]
1 :  [0, 4]
0 :  [0, 5]
2 :  [0, 6]
4 :  [0, 7]
3 :  [0, 2]
+++++++++++++++++++++++
2 :  [0, 9]
0 :  [0, 8]
+++++++++++++++++++++++
2 :  [1, 2]
0 :  [1, 3]
4 :  [1, 0]
4 :  [1, 7]
1 :  [1, 4]
2 :  [1, 5]
4 :  [1, 8]
1 :  [1, 9]
2 :  [2, 0]
4 :  [2, 1]
1 :  [2, 2]
2 :  [2, 3]
4 :  [2, 4]
3 :  [1, 1]
2 :  [2, 6]
4 :  [2, 7]
3 :  [2, 8]
2 :  [2, 9]
1 :  [2, 5]
0 :  [1, 6]

Upvotes: 3

user10737101
user10737101

Reputation:

The output you are seeing is a result of how you populate the queue, consider what the block

for i in range(10):
    q.put([i], block=False)

is doing; it is queuing 10 new values to q, subsequently the block

for i in range(5):
    threads.append(MongoInsertThread(q, i))

passes queue to 5 threads, each of which calls get() once for a total of 5 calls. So at line 16 in the first iteration of e, q is comprised of

[0], [1], [2], [3], [4], [5], [6], [7], [8], [9]

After the threads complete it contains

[5], [6], [7], [8], [9]

After which 10 more values are queued, yielding

[5], [6], [7], [8], [9], [0], [1], [2], [3], [4], [5], [6], [7], [8], [9]

5 values are then again removed leaving

[0], [1], [2], [3], [4], [5], [6], [7], [8], [9]

After which another 10 values are queued,

[0], [1], [2], [3], [4], [5], [6], [7], [8], [9], [0], [1], [2], [3], [4], [5], [6], [7], [8], [9]

Followed by the last removal which leaves

[5], [6], [7], [8], [9], [0], [1], [2], [3], [4], [5], [6], [7], [8], [9]

If you want 10 elements removed every time you must have 10 calls to get(), in the setup you have this requires 10 threads, or changing the thread loop to

for i in range(10):
  threads.append(MongoInsertThread(q, i))

However this isn't necessarily the most efficient way to do this, since for 100 elements you must generate 100 threads. It is better to generate a smaller number of threads and have them each call get() multiple times, like so

class MongoInsertThread(threading.Thread):
    def __init__(self,  queue, thread_id, m):
        super(MongoInsertThread, self).__init__()
        self.thread_id = thread_id
        self.queue = queue
        self.m = m

    def run(self):
        for i in range(self.m):
            # Using sys,stdout.write keeps the output lines from getting garbled
            sys.stdout.write(str(self.thread_id)+': '+str(self.queue.get())+"\n")

def save_to_mongo_with_thread():
    q = Queue.Queue()
    n1 = 11
    n2 = 5

    for e in range(3):
        for i in range(n1):
            q.put([i], block=False)
        threads = []
        # Caution - if n1 < n2 this will fail
        for i in range(n2-1):
            threads.append(MongoInsertThread(q, i, (n1/n2)))
        threads.append(MongoInsertThread(q, n2-1, (n1/n2)+(n1 % n2))) # Handles (n1 % n2 != 0)
        for t in threads:
            t.start()
        for t in threads:
            t.join()
        print("+++++++++++++++++++++++")

Sample output (using n1 = 11 not n1 = 10)

0: [0]
0: [1]
1: [2]
1: [3]
2: [4]
2: [5]
3: [6]
3: [7]
4: [8]
4: [9]
4: [10]
+++++++++++++++++++++++
0: [0]
0: [1]
1: [2]
1: [3]
2: [4]
2: [5]
3: [6]
3: [7]
4: [8]
4: [9]
4: [10]
+++++++++++++++++++++++
0: [0]
0: [1]
1: [2]
2: [3]
1: [4]
2: [5]
3: [6]
3: [7]
4: [8]
4: [9]
4: [10]
+++++++++++++++++++++++

Upvotes: 1

Related Questions