Reputation: 4030
I have a function:
import time
def all_40k():
for _ in range(400000):
print('validate')
print('parsing')
print('inserting')
if __name__ == '__main__':
start_time = time.time()
all_40k()
print(f'used time:{time.time()-start_time}')
The output is:
used time:9.545064210891724
Because this a same function repeated 40k times,so I want to have 4 parallelly functions running at the same time each function running 10k,ideally this will be 4 times faster.
So I first tried multiple threading:
import threading
import time
def first_10k():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
def second_10k():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
def third_10k():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
def forth_10k():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
thread1 = threading.Thread(target=first_10k)
thread2 = threading.Thread(target=second_10k)
thread3 = threading.Thread(target=third_10k)
thread4 = threading.Thread(target=forth_10k)
thread1.start()
thread2.start()
thread3.start()
thread4.start()
if __name__ == '__main__':
start_time = time.time()
thread1.join()
thread2.join()
thread3.join()
thread4.join()
print(f'used time:{time.time()-start_time}')
To my surprise ,the output is:
used time:23.058093309402466
And then I tried asyncio:
import time
import asyncio
async def test_1():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
async def test_2():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
async def test_3():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
async def test_4():
for _ in range(100000):
print('validate')
print('parsing')
print('inserting')
async def multiple_tasks():
input_coroutines = [test_1(), test_2(), test_3(),test_4()]
res = await asyncio.gather(*input_coroutines, return_exceptions=True)
return res
if __name__ == '__main__':
start_time = time.time()
res1, res2 ,res3,res4 = asyncio.get_event_loop().run_until_complete(multiple_tasks())
print(f'used time:{time.time()-start_time}')
The output is:
used time:9.295843601226807
In the end I tried ProcessPoolExecutor:
import time
from concurrent.futures import ProcessPoolExecutor
def data_handler(urls):
for i in range(urls[0], urls[1]):
print('validate')
print('parsing')
print('inserting')
def run():
urls = [(1,100000),(100001,200000),(2000001,300000),(300001,400000)]
with ProcessPoolExecutor() as excute:
excute.map(data_handler,urls)
if __name__ == '__main__':
start_time = time.time()
run()
stop_time = time.time()
print('used time %s' % (stop_time - start_time))
The output is:
used time 12.726619243621826
So how can I speed up the process? I think I went the wrong way.Any friend can help? Best regards!
Upvotes: 3
Views: 1085
Reputation: 23624
Okay, so what you noticed:
No parallelism 9.545064210891724
asyncio 9.295843601226807
multithreading 12.726619243621826
Thread Pool 23.058093309402466
First off, Asyncio doesn't actually use threads, and performance if you can guess it, relies on there being some I/O. Asyncio alternates between tasks in a loop, switching whenever one hits an await
. If await
is not used it will just end up running each of the tasks one at a time, and not switch at all.
With threads, only one thread is able to hold the control of the Python interpreter, due to the Global Interpreter Lock. What you end up with here is a bunch of contention from the different threads all trying to do work at the same time. This context switching is what's slowing down your app. Similar to asyncio, you really only get those speedups if you want to schedule other work while waiting on some I/O.
Okay, so now surely the multiprocessing case should have run faster.. right? Well, each process does has it's own interpreter lock, however, the holdup is in your print
statements. Each process is being blocked trying to send their output to the same console pipe. Let me show you with an example.
Say we have a method to be run 4 times. Once serially and once in parallel
def run(thread):
print(f"Starting thread: {thread}")
for i in range(500000):
print('foobar')
print(f"Finished thread: {thread}")
def run_singlethreaded():
start_time = time.time()
for thread in ["local"] * 4:
run(thread)
stop_time = time.time()
return stop_time - start_time
def run_multiprocessing():
start_time = time.time()
with ProcessPoolExecutor(max_workers=4) as ex:
ex.map(run, ["mp0", "mp1", "mp2", "mp3"])
stop_time = time.time()
return stop_time - start_time
if __name__ == '__main__':
singlethreaded_time = run_singlethreaded()
multiprocessing_time = run_multiprocessing()
print(f"Finished singlethreaded in: {singlethreaded_time}")
print(f"Finished multiprocessing in: {multiprocessing_time}")
If we run this and print the timings you would be surprised to see:
Finished singlethreaded in: 10.513998746871948
Finished multiprocessing in: 12.252000570297241
Now if we changed the print to something more simple which would not cause an IO bottleneck:
def run(thread):
print(f"Starting thread: {thread}")
for i in range(100000000):
pass
print(f"Finished thread: {thread}")
You will get the parallelism speed up you expect:
Finished singlethreaded in: 9.816999435424805
Finished multiprocessing in: 2.503000020980835
The important take away here is that before parallelism can help you, you need to understand where you are resource constrained. In the case of IO bound applications, threading or asyncio can be helpful. In the case of CPU bound applications, multiprocessing can be useful. And there are other times where neither will really help you (like the print
statement) because the bottleneck exists within a system external to the app. Hope this helps!
Upvotes: 5