MemeLord
MemeLord

Reputation: 23

Splitting a list into batches to process elements in each batch multi threaded

I am trying pass each element from my list to a function that is being started on its own thread doing its own work. The problem is if the list has 100+ elements it will start 100 functions() on 100 threads.

For the sake of my computer I want to process the list in batches of 10's with he following steps:

  1. Batch 1 gets queued.
  2. Pass each element from batch1 to the function getting started on its own thread (This way I will only have 10 function threads running at a time)
  3. Once all 10 threads have finished, they get popped off from their queue
  4. Repeat until all batches are done

I was trying to use two lists where first 10 elements gets popped into list2. Process the list2, once the threads are done, pop 10 more elements until list1 reaches length of 0.

I have gotten this far not sure how to proceed.

    carsdotcomOptionVal, carsdotcomOptionMakes = getMakes()
    second_list = []

    threads = []

    while len(carsdotcomOptionVal) != 0:
        second_list.append(carsdotcomOptionVal.pop(10))
        for makesOptions in second_list:
            th = threading.Thread(target=getModels, args=[makesOptions])
            th.start()
            threads.append(th)

        for thread in threads:
            thread.join()

Lastly the elements from the main list dont have to be even as they can be odd.

Upvotes: 1

Views: 2260

Answers (1)

Ofer Sadan
Ofer Sadan

Reputation: 11942

You should use a queue.Queue object, which can create a thread-safe list of tasks for other "worker-threads". You can choose how many worker-threads are active, and they would each feed from the list until it's done.

Here's what a sample code looks like with a queue:

import queue
import threading

threads_to_start = 10 # or choose how many you want
my_queue = queue.Queue()

def worker():
    while not my_queue.empty():
        data = my_queue.get()
        do_something_with_data(data)
        my_queue.task_done()

for i in range(100):
    my_queue.put(i) # replace "i" with whatever data you want for the threads to process

for i in range(threads_to_start):
    t = threading.Thread(target=worker, daemon=True) # daemon means that all threads will exit when the main thread exits
    t.start()

my_queue.join() # this will block the main thread from exiting until the queue is empty and all data has been processed

Keep in mind this is just a pseudo-code rough start to introduce you to threading and queues, there is more to it than just that, but that example should work in most simple use-cases

This is scalable too - all you have to change if you can support more or less threads is change the number you initially set in threads_to_start

Upvotes: 3

Related Questions