William
William

Reputation: 4030

Which is the best way to parallelly running multiple tasks in Python

I have a function:

import time

def all_40k():
    for _ in range(400000):
        print('validate')
        print('parsing')
        print('inserting')
if __name__ == '__main__':
    start_time = time.time()
    all_40k()
    print(f'used time:{time.time()-start_time}')

The output is:

used time:9.545064210891724

Because this a same function repeated 40k times,so I want to have 4 parallelly functions running at the same time each function running 10k,ideally this will be 4 times faster.

So I first tried multiple threading:

import threading
import time
def first_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


def second_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')

def third_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')

def forth_10k():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')

thread1 = threading.Thread(target=first_10k)
thread2 = threading.Thread(target=second_10k)
thread3 = threading.Thread(target=third_10k)
thread4 = threading.Thread(target=forth_10k)

thread1.start()
thread2.start()
thread3.start()
thread4.start()
if __name__ == '__main__':
    start_time = time.time()
    thread1.join()
    thread2.join()
    thread3.join()
    thread4.join()
    print(f'used time:{time.time()-start_time}')

To my surprise ,the output is:

used time:23.058093309402466

And then I tried asyncio:

import time
import asyncio

async def test_1():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def test_2():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def test_3():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def test_4():
    for _ in range(100000):
        print('validate')
        print('parsing')
        print('inserting')


async def multiple_tasks():
  input_coroutines = [test_1(), test_2(), test_3(),test_4()]
  res = await asyncio.gather(*input_coroutines, return_exceptions=True)
  return res

if __name__ == '__main__':
  start_time = time.time()
  res1, res2 ,res3,res4 = asyncio.get_event_loop().run_until_complete(multiple_tasks())
  print(f'used time:{time.time()-start_time}')

The output is:

used time:9.295843601226807

In the end I tried ProcessPoolExecutor:

import time
from concurrent.futures import ProcessPoolExecutor
def data_handler(urls):
    for i in range(urls[0], urls[1]):
        print('validate')
        print('parsing')
        print('inserting')

def run():
    urls = [(1,100000),(100001,200000),(2000001,300000),(300001,400000)]
    with ProcessPoolExecutor() as excute:
        excute.map(data_handler,urls)

if __name__ == '__main__':
    start_time = time.time()
    run()
    stop_time = time.time()
    print('used time %s' % (stop_time - start_time))

The output is:

used time 12.726619243621826

So how can I speed up the process? I think I went the wrong way.Any friend can help? Best regards!

Upvotes: 3

Views: 1085

Answers (1)

flakes
flakes

Reputation: 23624

Okay, so what you noticed:

No parallelism   9.545064210891724
asyncio          9.295843601226807
multithreading   12.726619243621826
Thread Pool      23.058093309402466

First off, Asyncio doesn't actually use threads, and performance if you can guess it, relies on there being some I/O. Asyncio alternates between tasks in a loop, switching whenever one hits an await. If await is not used it will just end up running each of the tasks one at a time, and not switch at all.

With threads, only one thread is able to hold the control of the Python interpreter, due to the Global Interpreter Lock. What you end up with here is a bunch of contention from the different threads all trying to do work at the same time. This context switching is what's slowing down your app. Similar to asyncio, you really only get those speedups if you want to schedule other work while waiting on some I/O.

Okay, so now surely the multiprocessing case should have run faster.. right? Well, each process does has it's own interpreter lock, however, the holdup is in your print statements. Each process is being blocked trying to send their output to the same console pipe. Let me show you with an example.

Say we have a method to be run 4 times. Once serially and once in parallel

def run(thread):
    print(f"Starting thread: {thread}")
    for i in range(500000):
        print('foobar')
    print(f"Finished thread: {thread}")


def run_singlethreaded():
    start_time = time.time()

    for thread in ["local"] * 4:
        run(thread)

    stop_time = time.time()
    return stop_time - start_time


def run_multiprocessing():
    start_time = time.time()

    with ProcessPoolExecutor(max_workers=4) as ex:
        ex.map(run, ["mp0", "mp1", "mp2", "mp3"])

    stop_time = time.time()
    return stop_time - start_time

if __name__ == '__main__':
    singlethreaded_time = run_singlethreaded()
    multiprocessing_time = run_multiprocessing()
    print(f"Finished singlethreaded in:  {singlethreaded_time}")
    print(f"Finished multiprocessing in: {multiprocessing_time}")

If we run this and print the timings you would be surprised to see:

Finished singlethreaded in:  10.513998746871948
Finished multiprocessing in: 12.252000570297241

Now if we changed the print to something more simple which would not cause an IO bottleneck:

def run(thread):
    print(f"Starting thread: {thread}")
    for i in range(100000000):
        pass
    print(f"Finished thread: {thread}")

You will get the parallelism speed up you expect:

Finished singlethreaded in:  9.816999435424805
Finished multiprocessing in: 2.503000020980835

The important take away here is that before parallelism can help you, you need to understand where you are resource constrained. In the case of IO bound applications, threading or asyncio can be helpful. In the case of CPU bound applications, multiprocessing can be useful. And there are other times where neither will really help you (like the print statement) because the bottleneck exists within a system external to the app. Hope this helps!

Upvotes: 5

Related Questions