Anubhav Singh
Anubhav Singh

Reputation: 482

Limit number of active threads Python

I have sequential producer consumer model which is taking a lot of time to execute. So I am trying to make the consumer code run concurrently.

Note: objects is a generator.

func report_object(self, object_type, objects):
    for obj in objects:
        try:
            change_handler(obj, self.config)
        except Exception as e:
            LOG.error("Error occurred in handling object: %s" % e)
            LOG.exception(e)
    else:
        LOG.info(" Consumer: no objects reported")

Threaded implementation of the above function:

import threading

func report_object(self, object_type, objects):
    threads = []
    for obj in objects:
        try:
            t = threading.Thread(target=change_handler,args=(obj, self.config))
            LOG.info(" ***** Number of active threads: %d *****", threading.activeCount())
            t.start()
            threads.append(t)
        except Exception as e:
            LOG.error("Error occurred in handling object: %s" % e)
            LOG.exception(e)
   for t in threads: 
      t.join()
   else:
       LOG.info(" Consumer: no objects reported")

If the above mechanism is followed I am running as many threads as len(objects). I this case if the objects become very huge like 1000/10000 then what will be the impact? Will there be a race condition? If yes then how can I prevent this? I tried another solution like:

threads = [ threading.Thread(target=change_handler,args=(obj, self.config)) for _ in range(8)]
for thread in threads:
    thread.start()
    LOG.info(thread.name)


for thread in threads:
    thread.join()

The number of active thread is still increasing. What would be the best way to restrict the number of active threads and best way to make the above function run concurrently.

Upvotes: 1

Views: 502

Answers (1)

Booboo
Booboo

Reputation: 44013

The best way of controlling the number of threads is to use the ThreadPoolExecutor from the concurrent.futures package, and there are several ways of doing this. One way is to use the submit method, which returns a Future object representing the future completion of the thread. If the thread returns a result, you can call the result method on this object which will block until the call is complete and then returns the value returned from the call (there are, of course, many other methods you can call on the Future object). You are not obliged to save the Future object if the thread does not return a value or you do not otherwise need to test for successful completion.

Here is an example of how to use the ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor
import time, random

def my_thread(n):
    time.sleep(random.random())
    return n, time.time()

MAX_THREADS = 10

with ThreadPoolExecutor(max_workers=MAX_THREADS) as e:
    futures = [e.submit(my_thread, n) for n in range(15)]
    for f in futures:
        print(f.result())

Prints:

(0, 1586782110.1816075)
(1, 1586782109.4404495)
(2, 1586782109.6663365)
(3, 1586782109.8307955)
(4, 1586782109.6733325)
(5, 1586782109.6103601)
(6, 1586782109.3914738)
(7, 1586782109.6803281)
(8, 1586782109.8587916)
(9, 1586782109.7173235)
(10, 1586782110.3664994)
(11, 1586782110.1816075)
(12, 1586782110.518443)
(13, 1586782110.4524374)
(14, 1586782110.0256832)

Upvotes: 1

Related Questions