prabhuiitdhn
prabhuiitdhn

Reputation: 79

How to process with thread Queue in python

            from queue import Queue
            import threading
            import time

            queue1 = Queue()


            # this function should finish its work and restart with fresh queue1
            def association_helper():
                # this program should get the whole data from the queue, add into the list and print it. again it starts with
                # remaining items in queue (the items which was inserting when this function printings the value)
                lock = threading.Lock()
                lock.acquire()
                items = []
                print("Start..")
                while True:
                    if queue1.qsize()>0:
                        print("Line no 13:", queue1.qsize())
                        SizeofQueue1 = queue1.qsize()
                        for i in range(SizeofQueue1):
                            items.append(queue1.get())
                        queue1.task_done()
                        print("Line no 19:", len(items))
                        print(items)
                        print("Line no 25: done")
                        time.sleep(0.1)
                lock.release()


            i = 0


            def main():
                global i
                # continuous data coming and adding in queue
                while True:
                    queue1.put([i])
                    i += 1


            if __name__ == '__main__':
                # main thread will always run (adding the items in the queue)
                f_thread = threading.Thread(target=association_helper)
                f_thread.daemon = True
                f_thread.start()
                main()


    output:

    Start... 
    Line no 13: 1415 
    Line no 19: 3794 
    Line no 25: done
    Line no 13: 40591 
    Line no 19: 41856 
    Line no 25: done 
    Line no 13: 78526


as per expectations, the line no 13 and line no 19 should be same. also, after, line no 25, it should print Start..(because association_helps should finish its execution and run again)

Why association_helper function is running one time? why it is not finishing its work and restarting again with fresh remaining items in Queue?

Motivation:

  1. queue1 will always be adding new items in the main thread.
  2. when sizeof(queue1)>0, association_helper should extract the whole data from queue1 and process with data.
  3. But adding items in the queue should be continued
  4. Once association_helper finishes its execution, it should start fresh with the new items in the queue.

Upvotes: 1

Views: 2852

Answers (1)

Amiram
Amiram

Reputation: 1295

Let start at the end:

as per expectations, the line no 13 and line no 19 should be same.

Due to the fact that you get from the queue in one thread and inserting (put) to it on another without using any Lock you should not expect that between two line (in the thread function) nothing will be added to the queue. That is what you are seeing, printing the size in line 13 and getting the size in line 14 resulting with different values.

Also, after, line no 25, it should print Start..(because association_helps should finish its execution and run again)

You print("Start..") before entering the while True loop. There for you will not see any more of that print, unless you will call this function again.


The following are explanations and examples for how to resolve the races in the put/get threaded queue:

Declare the lock as a global variable.

lock1 = threading.Lock()

Using this lock now let's ensure that the size of the queue and the expected len(items) will results with the same value.

with lock1:
    print("Line no 13:", queue1.qsize())
    SizeofQueue1 = queue1.qsize()
# and
with lock1:
    queue1.put([i])

This will results with - the same expected size.:

Line no 13: 9
Line no 19: 9
[[1], [2], [3], [4], [5], [6], [7], [8], [9]]
Line no 25: done

Regarding the print("Start.."), you can just insert it to the while loop so it will printed between iterations.

while True:
        print("Start..")
        if queue1.qsize()>0:
            # The rest of the code

Finally, If you want the items list to contain only the items from the current iteration you need to clear it. If you wold not clear the list between two iteration the difference will just get bigger and bigger.

list.clear() Remove all items from the list. Equivalent to del a[:].

And you will results with:

while True:
        print("Start..")
        items.clear()
        if queue1.qsize()>0:
            # The rest of the code

Upvotes: 1

Related Questions