Anthony Taylor
Anthony Taylor

Reputation: 3383

Multithreading of For loop in python

I am creating some scripts to help my database import while using docker. I currently have a directory filled with data, and I want to import as quickly as possibly.

All of the work done is all single threaded, so I wanted to speed things up by passing off multiple jobs at once to each thread on my server.

This is done by this code I've written.

#!/usr/bin/python
import sys
import subprocess

cities = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"];

for x in cities:
    dockerscript = "docker exec -it worker_1 perl import.pl ./public/%s %s mariadb" % (x,x)
    p = subprocess.Popen(dockerscript, shell=True, stderr=subprocess.PIPE)

This works fine if I have more than 10 cores, each gets its own. What I want to do is set it up, so if I have 4 cores, the first 4 iterations of the dockerscript runs, 1 to 4, and 5 to 10 wait.

Once any of the 1 to 4 completes, 5 is started and so on until it is all completed.

I am just having a harder time figuring out how to do this is python

Thanks

Upvotes: 1

Views: 83

Answers (2)

tdelaney
tdelaney

Reputation: 77337

John already has the answer but there are a couple of subtleties worth mentioning. A thread pool is fine for this application because the thread just spends its time blocked waiting for the subprocess to terminate. And you can use map with chunksize=1 so the pool goes back to the parent to fetch a new job on each iteration.

#!/usr/bin/python
import sys
import subprocess
import multiprocessing.pool

cities = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]

def run_docker(city):
    return subprocess.call(['docker', 'exec', '-it', 'worker_1', 'perl',
        'import.pl', './public/{0}'.format(city), city, 'mariadb'])

pool = multiprocessing.pool.ThreadPool()
results = pool.map(run_docker, cities, chunksize=1)
pool.close()
pool.join()

Upvotes: 1

John Zwinck
John Zwinck

Reputation: 249093

You should use multiprocessing.Pool() which will automatically create one process per core, then submit your jobs to it. Each job will be a function which calls subprocess to start Docker. Note that you need to make sure the jobs are synchronous--i.e. the Docker command must not return before it is done working.

Upvotes: 2

Related Questions