Reputation: 482
I have sequential producer consumer model which is taking a lot of time to execute. So I am trying to make the consumer code run concurrently.
Note: objects is a generator.
func report_object(self, object_type, objects):
for obj in objects:
try:
change_handler(obj, self.config)
except Exception as e:
LOG.error("Error occurred in handling object: %s" % e)
LOG.exception(e)
else:
LOG.info(" Consumer: no objects reported")
Threaded implementation of the above function:
import threading
func report_object(self, object_type, objects):
threads = []
for obj in objects:
try:
t = threading.Thread(target=change_handler,args=(obj, self.config))
LOG.info(" ***** Number of active threads: %d *****", threading.activeCount())
t.start()
threads.append(t)
except Exception as e:
LOG.error("Error occurred in handling object: %s" % e)
LOG.exception(e)
for t in threads:
t.join()
else:
LOG.info(" Consumer: no objects reported")
If the above mechanism is followed I am running as many threads as len(objects). I this case if the objects become very huge like 1000/10000 then what will be the impact? Will there be a race condition? If yes then how can I prevent this? I tried another solution like:
threads = [ threading.Thread(target=change_handler,args=(obj, self.config)) for _ in range(8)]
for thread in threads:
thread.start()
LOG.info(thread.name)
for thread in threads:
thread.join()
The number of active thread is still increasing. What would be the best way to restrict the number of active threads and best way to make the above function run concurrently.
Upvotes: 1
Views: 502
Reputation: 44013
The best way of controlling the number of threads is to use the ThreadPoolExecutor
from the concurrent.futures
package, and there are several ways of doing this. One way is to use the submit
method, which returns a Future
object representing the future completion of the thread. If the thread returns a result, you can call the result
method on this object which will block until the call is complete and then returns the value returned from the call (there are, of course, many other methods you can call on the Future
object). You are not obliged to save the Future
object if the thread does not return a value or you do not otherwise need to test for successful completion.
Here is an example of how to use the ThreadPoolExecutor
:
from concurrent.futures import ThreadPoolExecutor
import time, random
def my_thread(n):
time.sleep(random.random())
return n, time.time()
MAX_THREADS = 10
with ThreadPoolExecutor(max_workers=MAX_THREADS) as e:
futures = [e.submit(my_thread, n) for n in range(15)]
for f in futures:
print(f.result())
Prints:
(0, 1586782110.1816075)
(1, 1586782109.4404495)
(2, 1586782109.6663365)
(3, 1586782109.8307955)
(4, 1586782109.6733325)
(5, 1586782109.6103601)
(6, 1586782109.3914738)
(7, 1586782109.6803281)
(8, 1586782109.8587916)
(9, 1586782109.7173235)
(10, 1586782110.3664994)
(11, 1586782110.1816075)
(12, 1586782110.518443)
(13, 1586782110.4524374)
(14, 1586782110.0256832)
Upvotes: 1