Reputation: 4681
I'm trying to log into a website simultaneously using multiple credentials with aiohttp
and asyncio
. In the create_tasks
function, I generate a list of sessions to be used for each. The reason I cannot just create a sesssion within the login
function is because the same session object will be used throughout the code. What I'm trying to do is devise a way that I can use a context manager to handle the closing of the session (to avoid the runtime errors of leaving it open).
The following code works as intended (concurrent gathering of the login page and parsing of the token in a process pool), but it generates sessions separately from the tasks and requires me to close them at the end.
from bs4 import BeautifulSoup
from concurrent.futures import ProcessPoolExecutor
import aiohttp
import asyncio
#TODO: make this safe, handle exceptions
LOGIN_URL = "http://example.com/login"
CLIENT_CNT = 10
proc_pool = ProcessPoolExecutor(CLIENT_CNT)
def get_key(text):
soup = BeautifulSoup(text, "html.parser")
form = soup.find("form")
key = form.find("input", attrs={"type": "hidden", "name": "authenticityToken"})
return key.get("value", None)
async def login(username:str, password:str, session:aiohttp.ClientSession, sem:asyncio.BoundedSemaphore, loop:asyncio.AbstractEventLoop=None):
loop = loop or asyncio.get_event_loop()
async with sem:
async with session.get(LOGIN_URL) as resp:
x = await asyncio.ensure_future(loop.run_in_executor(proc_pool, get_key, await resp.text()))
print(x)
def create_tasks(usernames, passwords, sem:asyncio.BoundedSemaphore, loop:asyncio.AbstractEventLoop=None):
loop = loop or asyncio.get_event_loop()
tasks = []
sessions = []
for u, p in zip(usernames, passwords):
session = aiohttp.ClientSession(loop=loop)
sessions.append(session)
tasks.append(login(u, p, session, sem, loop))
return tasks, sessions
if __name__ == "__main__":
loop = asyncio.get_event_loop()
sem = asyncio.BoundedSemaphore(CLIENT_CNT)
usernames = ("a", "b", "c", "d", "e", "f", "g")
passwords = ("a", "b", "c", "d", "e", "f", "g")
tasks, sessions = create_tasks(usernames, passwords, sem, loop)
loop.run_until_complete(asyncio.gather(*tasks, loop=loop))
for session in sessions:
session.close()
I previously made create_tasks
a coroutine, wrote a wrapper class to make async iterables, and trying using
async with aiohttp.ClientSession() as session:
tasks.append(login(u, p, session, sem, loop)
But as I feared, it said that the session was already closed by the time it was run.
Upvotes: 7
Views: 1427
Reputation: 425
Use ExitStack.
from contextlib import ExitStack
def create_tasks(..., context):
tasks = []
for username in usernames:
session = aiohttp.ClientSession()
tasks.append(...)
context.enter_context(session)
return tasks
if __name__ == "__main__":
context = ExitStack()
tasks = create_tasks(..., context)
with context:
loop.run_until_complete(asyncio.gather(*tasks))
Upvotes: 1
Reputation: 11779
Here's a structure that makes reasoning easier:
async def user(u, p, ...):
"""Everything a single user does"""
auth = await login(u, p)
await download_something(auth, ...)
await post_something(auth, ...)
async def login(u, p): ...
async with aiohttp.ClientSession() as session:
async with session.get("http://xxx/login", ...) as r:
data = await r.json()
return data["something"]
async def download_xxx(...): ...
async def post_xxx(...): ...
async def everything():
creds = [("u1", "p1"), ...]
flows = [asyncio.ensure_future(user(cred)) for cred in creds]
for flow in flows:
await flow
Caveat programmator: aiohttp
by default appears to store cookies, make sure it doesn't cross-pollinate your user flows.
Bonus points for: correct use of asyncio.gather()
in the last async function.
Upvotes: 4
Reputation: 701
You didn't really explain what kind of tasks do you need, a simple get?
Something more complicated?
Do you want it to be specific per username/password?
Do you need to save all responses in the end?
For this code, I assumed username/password doesn't matter, but it can change quickly.
Instead of how you initiated the sessions separately I used consumer/producer pattern.
Each consumer a session with context manager, also no need for Semaphore (because of the queue).
import asyncio
from concurrent.futures import ProcessPoolExecutor
from aiohttp import ClientSession
from bs4 import BeautifulSoup
LOGIN_URL = "http://example.com/login"
CLIENT_CNT = 10
proc_pool = ProcessPoolExecutor(CLIENT_CNT)
def get_key(text):
soup = BeautifulSoup(text, "html.parser")
form = soup.find("form")
key = form.find("input", attrs={"type": "hidden", "name": "authenticityToken"})
return key.get("value", None)
async def init_consumer(username: str, password: str, loop, queue):
loop = loop or asyncio.get_event_loop()
async with ClientSession(loop=loop) as session:
# init the session with creds? i you didn't use the username/password
async with session.get(LOGIN_URL) as login_resp:
x = await asyncio.ensure_future(loop.run_in_executor(proc_pool, get_key, await login_resp.text()))
print(x)
url = await queue.get()
while url is not None:
# Do things with session and queue
async with session.get(url) as resp:
rsp_as_txt = await resp.text()
queue.task_done()
url = await queue.get()
async def generate_tasks(queue):
tasks = ["http://www.example.com" for i in range(20)]
# putting all tasks in queue
for task in tasks:
await queue.put(task)
# waiting for all tasks to finish
queue.join()
# Telling consumer to finish process
for i in range(queue.maxsize):
queue.put(None)
async def run(loop):
queue = asyncio.Queue(CLIENT_CNT)
usernames = ("a", "b", "c", "d", "e", "f", "g")
passwords = ("a", "b", "c", "d", "e", "f", "g")
consumers = [asyncio.ensure_future(init_consumer(u, p, loop, queue)) for u, p in zip(usernames, passwords)]
return await generate_tasks(queue)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop=loop))
Upvotes: 0