jguy
jguy

Reputation: 211

How to run my code asynchronously when using asyncio queue.put() and queue.get() as part of task are gone?

I am writing a code to simulate getting data from a website which takes 3-5seconds with multiple selenium drivers. I intend that it will get the batch data and go through the retrieve() function asynchronously to speed up the process. I am trying to use the asyncio queue to achieve this by using queue.put() and queue.get(). The result is incorrect part of the current_value 11 to 15 are somehow disappeared as if it did not run the while loop at all. Is there anything missing in my code or advice to improve my code?

import asyncio
import random
import time


async def retrieve(queue, lock_browser1,lock_browser2,lock_browser3,lock_browser4,lock_browser5): 

    while True:

        item = await queue.get()


        if lock_browser1.locked() is False:
            async with lock_browser1:   
                await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
                print(item)
            break
        else:
            await asyncio.sleep(0)

        if lock_browser2.locked() is False:
            async with lock_browser2:   
                await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
                print(item)
            break
        else:
            await asyncio.sleep(0)
        
        if lock_browser3.locked() is False:
            async with lock_browser3:   
                await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
                print(item)
            break
        else:
            await asyncio.sleep(0)
        
        if lock_browser4.locked() is False:
            async with lock_browser4:   
                await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
                print(item)
            break
        else:
            await asyncio.sleep(0)

        if lock_browser5.locked() is False:
            async with lock_browser5:   
                await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
                print(item)
            break
        else:
            await asyncio.sleep(0)        
        
        if item is None:
            break


async def process(queue,x,y):
    start_value  = 10
    current_value = 0
    stop_value = 15    
    current_value = start_value
    while current_value <= stop_value:
        await queue.put([x,y, current_value])          
        current_value = current_value + 1  
        await asyncio.sleep(0)

    await queue.put(None)


    
    
async def main(x,y):
    
    lock_browser1 = asyncio.Lock()
    lock_browser2 = asyncio.Lock()
    lock_browser3 = asyncio.Lock()
    lock_browser4 = asyncio.Lock()
    lock_browser5 = asyncio.Lock()    
    
    queue = asyncio.Queue()    
    await asyncio.gather(process(queue, x, y),retrieve(queue,lock_browser1,lock_browser2,lock_browser3,lock_browser4,lock_browser5))
  

if __name__ == "__main__":
    start = time.perf_counter()
    for x in range(2):   
        for y in range(5):
                asyncio.run(main(x,y))
    end = time.perf_counter()
    duration=end-start
    print(f'Time elapsed:{duration:.2f}s')

INCORRECT RESULT:

[0, 0, 10]
[0, 1, 10]
[0, 2, 10]
[0, 3, 10]
[0, 4, 10]
[1, 0, 10]
[1, 1, 10]
[1, 2, 10]
[1, 3, 10]
[1, 4, 10]

INTEND RESULT:

[0, 0, 10]
[0, 0, 11]
[0, 0, 12]
[0, 0, 13]
[0, 0, 14]
[0, 0, 15]
[0, 1, 10]
[0, 1, 11]
[0, 1, 12]
   ...
[1, 4, 13]
[1, 4, 14]
[1, 4, 15]

Upvotes: 2

Views: 3019

Answers (3)

wwii
wwii

Reputation: 23743

I also thought that retrieve should only have one webdriver with one url to get (at a time). I was wondering whether the queue was actually necessary, I thought I could launch and await a retrieve instance for each url using the locks to limit a driver to one get at a time.

I wrote a driver proxy to simulate a non-async blocking method. Running driver.get in an executor was the key to getting this working.

import asyncio
import random
import time
from pprint import pprint
from itertools import cycle,product

class Webdriver:
    '''Blocking webdriver proxy.
    '''
    def __init__(self,name):
        self.name = name
    def get(self,url):
        snooze = random.uniform(3,5)
        time.sleep(snooze)
        return (f'{self.name}: {url}',round(snooze,3))

# acquire the lock, wait for then return a result
async def retrieve(lock,driver,url):
    await lock.acquire()
    try:
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(None,driver.get,url)
    finally:
        lock.release()
    return result

async def main():
    # five locks and drivers
    # pair a lock with a driver
    drivers = [(asyncio.Lock(),Webdriver(f'driver{n}')) for n in range(5)]
    drivers = cycle(drivers)

    # fifty urls
    urls = product(range(2),range(5),range(10,15))

    # make fifty tasks, one for each url
    args = zip(drivers,urls)
    tasks = set()
    for (lock,driver),url in args:
        task = asyncio.create_task(retrieve(lock,driver,url))
        tasks.add(task)

    stuff = await asyncio.gather(*tasks)
    return stuff

if __name__ == "__main__":
    q = time.time()
    result = asyncio.run(main())
    print(f'exc_time:{time.time()-q:.1f}, ntasks:{len(result)}, retrieval_time:{sum(a[-1] for a in result):.1f}')
    result.sort(key=lambda x: x[0][-10:])
#    pprint(result)

This works and performs pretty well. It distributes the urls evenly across the (lock,driver) pairs. I think that it mostly works because the get times are all fairly close - if a driver happened to get urls that were quick it would run through them and then not be able to help anymore. If a single url took a looong time it would hold up a driver and the other driver's wouldn't be able to take up the slack. For those two cases multiple drivers pulling from a queue would work better - something along the line of aaron's solution.

Upvotes: 0

aaron
aaron

Reputation: 43073

That retrieve function allows only one browser (Selenium driver) to run at a time.

A more reasonable implementation, to simulate only one request per browser at a time:

async def retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5):
    async def _retrieve(lock_browser):
        while True:
            async with lock_browser:
                await asyncio.sleep(random.uniform(3, 5))  # simulate only one request per browser at a time
                item = await queue.get()
                if item is None:
                    await queue.put(None)  # allow others to exit
                    break
                print(item)

    await asyncio.gather(
        _retrieve(lock_browser1),
        _retrieve(lock_browser2),
        _retrieve(lock_browser3),
        _retrieve(lock_browser4),
        _retrieve(lock_browser5),
    )

The process function and a suitable process_main function:

async def process(queue, x, y):
    start_value = 10
    stop_value = 15
    current_value = start_value
    while current_value <= stop_value:
        await queue.put([x, y, current_value])
        current_value = current_value + 1
        await asyncio.sleep(0)


async def process_main(queue):
    for x in range(2):
        for y in range(5):
            await process(queue, x, y)
    await queue.put(None)

The main function should hold the lock_browser instances, and the if __name__ == "__main__": block should only call the main function once:

async def main():
    lock_browser1 = asyncio.Lock()
    lock_browser2 = asyncio.Lock()
    lock_browser3 = asyncio.Lock()
    lock_browser4 = asyncio.Lock()
    lock_browser5 = asyncio.Lock()

    queue = asyncio.Queue()

    await asyncio.gather(
        process_main(queue),
        retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5),
    )


if __name__ == "__main__":
    start = time.perf_counter()
    asyncio.run(main())
    end = time.perf_counter()
    duration = end - start
    print(f'Time elapsed: {duration:.2f}s')

This runs in about 53 seconds on my machine.

Upvotes: 2

0Interest
0Interest

Reputation: 1842

TL;DR - remove the break from the if-statements and find another way not to run two or more of them, fix at the end.

The code really does what you tell it to do. Let's break it down into smaller pieces.

Part 1 - if __name__ ...

if __name__ == "__main__":
    start = time.perf_counter()
    for x in range(2):
        for y in range(5):
            asyncio.run(main(x, y))
    end = time.perf_counter()
    duration = end - start
    print(f'Time elapsed:{duration:.2f}s')

We have nested loops, the outer one (x) goes from 0-1 and the inner one (y) goes from 0-4, each iteration we call asyncio.run(main(x, y)), keep in mind calling asyncio.run one after another does not make it run concurrently - it will first call main(0, 0) - block everything else until it finishes, then move to the next call (i.e main(0, 1)).

Part 2 - The main function:

Following that, we called main(0, 0) and we move to the main function:

async def main(x, y):

    lock_browser1 = asyncio.Lock()
    lock_browser2 = asyncio.Lock()
    lock_browser3 = asyncio.Lock()
    lock_browser4 = asyncio.Lock()
    lock_browser5 = asyncio.Lock()

    queue = asyncio.Queue()
    await asyncio.gather(process(queue, x, y),
                         retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5))

I will dismiss the locks part - we create a new queue - queue. And call asyncio.gather() - which in contrast to asyncio.run - does run these functions concurrently. Again, dismissing the retrieve for now. We call process, again, one at a time because our main is being called one at a time.

Part 3 - The process function

async def process(queue, x, y):
    start_value = 10
    stop_value = 15
    current_value = start_value
    while current_value <= stop_value:
        await queue.put([x, y, current_value])
        current_value = current_value + 1
    await queue.put(None)

Here we iterate through values 10-15 with our original x, y values. We put the list inside the queue - and... that's it probably, nothing more than that.

Part 4 - The retrieve function

async def retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5):
    while True:

        item = await queue.get()
        if lock_browser1.locked() is False:

            async with lock_browser1:
                await asyncio.sleep(random.uniform(0.2, 0.3))  # simulate only one browser connect to the website at a time
                print(item)
            break
        else:
            await asyncio.sleep(0)

I've not added the full function, because it is too long, and it is basically useless - as lock_browser1 is always False - meaning we do enter the first if-statement, and then break from the while True - making the other if-else-blocks basically redundant (note that).


Summary:

We "pinch" together two functions: process and retrieve and run them concurrently, the process function is executing a fast while loop that increments the last number to the list making it look like: [0, 0, 10], [0, 0, 11] ... while retrieve gets the elements in the queue, prints the first element, breaks from the while-loop and the whole process starts over with a clean fresh queue. So by removing the break from the if-else statements it should be alright.

I know it was a dig for a small change, I will add a TL;DR for it, but I wanted to point out some things I found important to mention in case you missed them in your code.

Fix

I would add a flag that is true if we have entered an if statement, and false otherwise (I've moved the check if item is not None to the head of the block, so we won't print None for no use):

async def retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5):
    while True:
        has_entered_if = False
        item = await queue.get()

        if item is None:
            break

        if lock_browser1.locked() is False and not has_entered_if:
            has_entered_if = True
            async with lock_browser1:
                await asyncio.sleep(random.uniform(3, 5))  # simulate only one browser connect to the website at a time
                print(item)

        else:
            await asyncio.sleep(0)

        if lock_browser2.locked() is False and not has_entered_if:
            has_entered_if = True
            async with lock_browser2:
                await asyncio.sleep(random.uniform(3, 5))  # simulate only one browser connect to the website at a time
                print(item)
        else:
            await asyncio.sleep(0)

        if lock_browser3.locked() is False and not has_entered_if:
            has_entered_if = True
            async with lock_browser3:
                await asyncio.sleep(random.uniform(3, 5))  # simulate only one browser connect to the website at a time
                print(item)
        else:
            await asyncio.sleep(0)

        if lock_browser4.locked() is False and not has_entered_if:
            has_entered_if = True
            async with lock_browser4:
                await asyncio.sleep(random.uniform(3, 5))  # simulate only one browser connect to the website at a time
                print(item)
        else:
            await asyncio.sleep(0)

        if lock_browser5.locked() is False and not has_entered_if:
            async with lock_browser5:
                await asyncio.sleep(random.uniform(3, 5))  # simulate only one browser connect to the website at a time
                print(item)
        else:
            await asyncio.sleep(0)

Output:

[0, 0, 10]
[0, 0, 11]
[0, 0, 12]
[0, 0, 13]
[0, 0, 14]
[0, 0, 15]
[0, 1, 10]
[0, 1, 11]
[0, 1, 12]
...
[1, 4, 10]
[1, 4, 11]
[1, 4, 12]
[1, 4, 13]
[1, 4, 14]
[1, 4, 15]

Upvotes: 2

Related Questions