Goodies
Goodies

Reputation: 4681

How can I change this code to use context managers?

I'm trying to log into a website simultaneously using multiple credentials with aiohttp and asyncio. In the create_tasks function, I generate a list of sessions to be used for each. The reason I cannot just create a sesssion within the login function is because the same session object will be used throughout the code. What I'm trying to do is devise a way that I can use a context manager to handle the closing of the session (to avoid the runtime errors of leaving it open).

The following code works as intended (concurrent gathering of the login page and parsing of the token in a process pool), but it generates sessions separately from the tasks and requires me to close them at the end.

from bs4 import BeautifulSoup
from concurrent.futures import ProcessPoolExecutor
import aiohttp
import asyncio

#TODO: make this safe, handle exceptions

LOGIN_URL = "http://example.com/login"
CLIENT_CNT = 10
proc_pool = ProcessPoolExecutor(CLIENT_CNT)

def get_key(text):
    soup = BeautifulSoup(text, "html.parser")
    form = soup.find("form")
    key = form.find("input", attrs={"type": "hidden", "name": "authenticityToken"})
    return key.get("value", None)

async def login(username:str, password:str, session:aiohttp.ClientSession, sem:asyncio.BoundedSemaphore, loop:asyncio.AbstractEventLoop=None):
    loop = loop or asyncio.get_event_loop()
    async with sem:
        async with session.get(LOGIN_URL) as resp:
            x = await asyncio.ensure_future(loop.run_in_executor(proc_pool, get_key, await resp.text()))
            print(x)

def create_tasks(usernames, passwords, sem:asyncio.BoundedSemaphore, loop:asyncio.AbstractEventLoop=None):
    loop = loop or asyncio.get_event_loop()
    tasks = []
    sessions = []
    for u, p in zip(usernames, passwords):
        session = aiohttp.ClientSession(loop=loop)
        sessions.append(session)
        tasks.append(login(u, p, session, sem, loop))
    return tasks, sessions

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    sem = asyncio.BoundedSemaphore(CLIENT_CNT)
    usernames = ("a", "b", "c", "d", "e", "f", "g")
    passwords = ("a", "b", "c", "d", "e", "f", "g")
    tasks, sessions = create_tasks(usernames, passwords, sem, loop)
    loop.run_until_complete(asyncio.gather(*tasks, loop=loop))
    for session in sessions:
        session.close()

I previously made create_tasks a coroutine, wrote a wrapper class to make async iterables, and trying using

async with aiohttp.ClientSession() as session:
    tasks.append(login(u, p, session, sem, loop)

But as I feared, it said that the session was already closed by the time it was run.

Upvotes: 7

Views: 1427

Answers (3)

Alvra
Alvra

Reputation: 425

Use ExitStack.

from contextlib import ExitStack

def create_tasks(..., context):
    tasks = []
    for username in usernames:
        session = aiohttp.ClientSession()
        tasks.append(...)
        context.enter_context(session)
    return tasks

if __name__ == "__main__":
    context = ExitStack()
    tasks = create_tasks(..., context)
    with context:
        loop.run_until_complete(asyncio.gather(*tasks))

Upvotes: 1

Dima Tisnek
Dima Tisnek

Reputation: 11779

Here's a structure that makes reasoning easier:

async def user(u, p, ...):
    """Everything a single user does"""
    auth = await login(u, p)
    await download_something(auth, ...)
    await post_something(auth, ...)

async def login(u, p): ...
    async with aiohttp.ClientSession() as session:
        async with session.get("http://xxx/login", ...) as r:
            data = await r.json()
            return data["something"]

async def download_xxx(...): ...
async def post_xxx(...): ...

async def everything():
    creds = [("u1", "p1"), ...] 
    flows = [asyncio.ensure_future(user(cred)) for cred in creds]
    for flow in flows:
        await flow

Caveat programmator: aiohttp by default appears to store cookies, make sure it doesn't cross-pollinate your user flows.

Bonus points for: correct use of asyncio.gather() in the last async function.

Upvotes: 4

etlsh
etlsh

Reputation: 701

You didn't really explain what kind of tasks do you need, a simple get?

Something more complicated?

Do you want it to be specific per username/password?

Do you need to save all responses in the end?

For this code, I assumed username/password doesn't matter, but it can change quickly.

Instead of how you initiated the sessions separately I used consumer/producer pattern.

Each consumer a session with context manager, also no need for Semaphore (because of the queue).

import asyncio
from concurrent.futures import ProcessPoolExecutor

from aiohttp import ClientSession
from bs4 import BeautifulSoup

LOGIN_URL = "http://example.com/login"
CLIENT_CNT = 10
proc_pool = ProcessPoolExecutor(CLIENT_CNT)


def get_key(text):
    soup = BeautifulSoup(text, "html.parser")
    form = soup.find("form")
    key = form.find("input", attrs={"type": "hidden", "name": "authenticityToken"})
    return key.get("value", None)


async def init_consumer(username: str, password: str, loop, queue):
    loop = loop or asyncio.get_event_loop()
    async with ClientSession(loop=loop) as session:
        # init the session with creds? i you didn't use the username/password
        async with session.get(LOGIN_URL) as login_resp:
            x = await asyncio.ensure_future(loop.run_in_executor(proc_pool, get_key, await login_resp.text()))
            print(x)
        url = await queue.get()
        while url is not None:
            # Do things with session and queue
            async with session.get(url) as resp:
                rsp_as_txt = await resp.text()
            queue.task_done()
            url = await queue.get()


async def generate_tasks(queue):
    tasks = ["http://www.example.com" for i in range(20)]
    # putting all tasks in queue
    for task in tasks:
        await queue.put(task)
    # waiting for all tasks to finish
    queue.join()
    # Telling consumer to finish process
    for i in range(queue.maxsize):
        queue.put(None)


async def run(loop):
    queue = asyncio.Queue(CLIENT_CNT)
    usernames = ("a", "b", "c", "d", "e", "f", "g")
    passwords = ("a", "b", "c", "d", "e", "f", "g")
    consumers = [asyncio.ensure_future(init_consumer(u, p, loop, queue)) for u, p in zip(usernames, passwords)]
    return await generate_tasks(queue)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop=loop))

Upvotes: 0

Related Questions