Reputation: 211
I am writing a code to simulate getting data from a website which takes 3-5seconds with multiple selenium drivers. I intend that it will get the batch data and go through the retrieve()
function asynchronously to speed up the process. I am trying to use the asyncio queue to achieve this by using queue.put()
and queue.get()
. The result is incorrect part of the current_value
11 to 15 are somehow disappeared as if it did not run the while loop at all. Is there anything missing in my code or advice to improve my code?
import asyncio
import random
import time
async def retrieve(queue, lock_browser1,lock_browser2,lock_browser3,lock_browser4,lock_browser5):
while True:
item = await queue.get()
if lock_browser1.locked() is False:
async with lock_browser1:
await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
print(item)
break
else:
await asyncio.sleep(0)
if lock_browser2.locked() is False:
async with lock_browser2:
await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
print(item)
break
else:
await asyncio.sleep(0)
if lock_browser3.locked() is False:
async with lock_browser3:
await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
print(item)
break
else:
await asyncio.sleep(0)
if lock_browser4.locked() is False:
async with lock_browser4:
await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
print(item)
break
else:
await asyncio.sleep(0)
if lock_browser5.locked() is False:
async with lock_browser5:
await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
print(item)
break
else:
await asyncio.sleep(0)
if item is None:
break
async def process(queue,x,y):
start_value = 10
current_value = 0
stop_value = 15
current_value = start_value
while current_value <= stop_value:
await queue.put([x,y, current_value])
current_value = current_value + 1
await asyncio.sleep(0)
await queue.put(None)
async def main(x,y):
lock_browser1 = asyncio.Lock()
lock_browser2 = asyncio.Lock()
lock_browser3 = asyncio.Lock()
lock_browser4 = asyncio.Lock()
lock_browser5 = asyncio.Lock()
queue = asyncio.Queue()
await asyncio.gather(process(queue, x, y),retrieve(queue,lock_browser1,lock_browser2,lock_browser3,lock_browser4,lock_browser5))
if __name__ == "__main__":
start = time.perf_counter()
for x in range(2):
for y in range(5):
asyncio.run(main(x,y))
end = time.perf_counter()
duration=end-start
print(f'Time elapsed:{duration:.2f}s')
INCORRECT RESULT:
[0, 0, 10]
[0, 1, 10]
[0, 2, 10]
[0, 3, 10]
[0, 4, 10]
[1, 0, 10]
[1, 1, 10]
[1, 2, 10]
[1, 3, 10]
[1, 4, 10]
INTEND RESULT:
[0, 0, 10]
[0, 0, 11]
[0, 0, 12]
[0, 0, 13]
[0, 0, 14]
[0, 0, 15]
[0, 1, 10]
[0, 1, 11]
[0, 1, 12]
...
[1, 4, 13]
[1, 4, 14]
[1, 4, 15]
Upvotes: 2
Views: 3019
Reputation: 23743
I also thought that retrieve should only have one webdriver with one url to get (at a time). I was wondering whether the queue was actually necessary, I thought I could launch and await a retrieve instance for each url using the locks to limit a driver to one get
at a time.
I wrote a driver proxy to simulate a non-async blocking method. Running driver.get in an executor was the key to getting this working.
import asyncio
import random
import time
from pprint import pprint
from itertools import cycle,product
class Webdriver:
'''Blocking webdriver proxy.
'''
def __init__(self,name):
self.name = name
def get(self,url):
snooze = random.uniform(3,5)
time.sleep(snooze)
return (f'{self.name}: {url}',round(snooze,3))
# acquire the lock, wait for then return a result
async def retrieve(lock,driver,url):
await lock.acquire()
try:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None,driver.get,url)
finally:
lock.release()
return result
async def main():
# five locks and drivers
# pair a lock with a driver
drivers = [(asyncio.Lock(),Webdriver(f'driver{n}')) for n in range(5)]
drivers = cycle(drivers)
# fifty urls
urls = product(range(2),range(5),range(10,15))
# make fifty tasks, one for each url
args = zip(drivers,urls)
tasks = set()
for (lock,driver),url in args:
task = asyncio.create_task(retrieve(lock,driver,url))
tasks.add(task)
stuff = await asyncio.gather(*tasks)
return stuff
if __name__ == "__main__":
q = time.time()
result = asyncio.run(main())
print(f'exc_time:{time.time()-q:.1f}, ntasks:{len(result)}, retrieval_time:{sum(a[-1] for a in result):.1f}')
result.sort(key=lambda x: x[0][-10:])
# pprint(result)
This works and performs pretty well. It distributes the urls evenly across the (lock,driver) pairs. I think that it mostly works because the get times are all fairly close - if a driver happened to get urls that were quick it would run through them and then not be able to help anymore. If a single url took a looong time it would hold up a driver and the other driver's wouldn't be able to take up the slack. For those two cases multiple drivers pulling from a queue would work better - something along the line of aaron's solution.
Upvotes: 0
Reputation: 43073
That retrieve
function allows only one browser (Selenium driver) to run at a time.
A more reasonable implementation, to simulate only one request per browser at a time:
async def retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5):
async def _retrieve(lock_browser):
while True:
async with lock_browser:
await asyncio.sleep(random.uniform(3, 5)) # simulate only one request per browser at a time
item = await queue.get()
if item is None:
await queue.put(None) # allow others to exit
break
print(item)
await asyncio.gather(
_retrieve(lock_browser1),
_retrieve(lock_browser2),
_retrieve(lock_browser3),
_retrieve(lock_browser4),
_retrieve(lock_browser5),
)
The process
function and a suitable process_main
function:
async def process(queue, x, y):
start_value = 10
stop_value = 15
current_value = start_value
while current_value <= stop_value:
await queue.put([x, y, current_value])
current_value = current_value + 1
await asyncio.sleep(0)
async def process_main(queue):
for x in range(2):
for y in range(5):
await process(queue, x, y)
await queue.put(None)
The main
function should hold the lock_browser
instances, and the if __name__ == "__main__":
block should only call the main
function once:
async def main():
lock_browser1 = asyncio.Lock()
lock_browser2 = asyncio.Lock()
lock_browser3 = asyncio.Lock()
lock_browser4 = asyncio.Lock()
lock_browser5 = asyncio.Lock()
queue = asyncio.Queue()
await asyncio.gather(
process_main(queue),
retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5),
)
if __name__ == "__main__":
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
duration = end - start
print(f'Time elapsed: {duration:.2f}s')
This runs in about 53 seconds on my machine.
Upvotes: 2
Reputation: 1842
break
from the if-statements and find another way not to run two or more of them, fix at the end.The code really does what you tell it to do. Let's break it down into smaller pieces.
if __name__ ...
if __name__ == "__main__":
start = time.perf_counter()
for x in range(2):
for y in range(5):
asyncio.run(main(x, y))
end = time.perf_counter()
duration = end - start
print(f'Time elapsed:{duration:.2f}s')
We have nested loops, the outer one (x
) goes from 0-1
and the inner one (y
) goes from 0-4
, each iteration we call asyncio.run(main(x, y))
, keep in mind calling asyncio.run
one after another does not make it run concurrently - it will first call main(0, 0)
- block everything else until it finishes, then move to the next call (i.e main(0, 1)
).
main
function:Following that, we called main(0, 0)
and we move to the main
function:
async def main(x, y):
lock_browser1 = asyncio.Lock()
lock_browser2 = asyncio.Lock()
lock_browser3 = asyncio.Lock()
lock_browser4 = asyncio.Lock()
lock_browser5 = asyncio.Lock()
queue = asyncio.Queue()
await asyncio.gather(process(queue, x, y),
retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5))
I will dismiss the locks part - we create a new queue - queue
. And call asyncio.gather()
- which in contrast to asyncio.run
- does run these functions concurrently. Again, dismissing the retrieve
for now. We call process
, again, one at a time because our main
is being called one at a time.
process
functionasync def process(queue, x, y):
start_value = 10
stop_value = 15
current_value = start_value
while current_value <= stop_value:
await queue.put([x, y, current_value])
current_value = current_value + 1
await queue.put(None)
Here we iterate through values 10-15
with our original x, y
values. We put the list inside the queue - and... that's it probably, nothing more than that.
retrieve
functionasync def retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5):
while True:
item = await queue.get()
if lock_browser1.locked() is False:
async with lock_browser1:
await asyncio.sleep(random.uniform(0.2, 0.3)) # simulate only one browser connect to the website at a time
print(item)
break
else:
await asyncio.sleep(0)
I've not added the full function, because it is too long, and it is basically useless - as lock_browser1
is always False
- meaning we do enter the first if-statement, and then break from the while True
- making the other if-else-blocks basically redundant (note that).
We "pinch" together two functions: process
and retrieve
and run them concurrently, the process
function is executing a fast while
loop that increments the last number to the list making it look like: [0, 0, 10], [0, 0, 11] ...
while retrieve
gets the elements in the queue, prints the first element, breaks from the while-loop and the whole process starts over with a clean fresh queue. So by removing the break
from the if-else statements it should be alright.
I know it was a dig for a small change, I will add a TL;DR for it, but I wanted to point out some things I found important to mention in case you missed them in your code.
I would add a flag that is true if we have entered an if statement, and false otherwise (I've moved the check if item
is not None
to the head of the block, so we won't print None
for no use):
async def retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5):
while True:
has_entered_if = False
item = await queue.get()
if item is None:
break
if lock_browser1.locked() is False and not has_entered_if:
has_entered_if = True
async with lock_browser1:
await asyncio.sleep(random.uniform(3, 5)) # simulate only one browser connect to the website at a time
print(item)
else:
await asyncio.sleep(0)
if lock_browser2.locked() is False and not has_entered_if:
has_entered_if = True
async with lock_browser2:
await asyncio.sleep(random.uniform(3, 5)) # simulate only one browser connect to the website at a time
print(item)
else:
await asyncio.sleep(0)
if lock_browser3.locked() is False and not has_entered_if:
has_entered_if = True
async with lock_browser3:
await asyncio.sleep(random.uniform(3, 5)) # simulate only one browser connect to the website at a time
print(item)
else:
await asyncio.sleep(0)
if lock_browser4.locked() is False and not has_entered_if:
has_entered_if = True
async with lock_browser4:
await asyncio.sleep(random.uniform(3, 5)) # simulate only one browser connect to the website at a time
print(item)
else:
await asyncio.sleep(0)
if lock_browser5.locked() is False and not has_entered_if:
async with lock_browser5:
await asyncio.sleep(random.uniform(3, 5)) # simulate only one browser connect to the website at a time
print(item)
else:
await asyncio.sleep(0)
Output:
[0, 0, 10]
[0, 0, 11]
[0, 0, 12]
[0, 0, 13]
[0, 0, 14]
[0, 0, 15]
[0, 1, 10]
[0, 1, 11]
[0, 1, 12]
...
[1, 4, 10]
[1, 4, 11]
[1, 4, 12]
[1, 4, 13]
[1, 4, 14]
[1, 4, 15]
Upvotes: 2