Reputation: 1872
I have a FastAPI application. In this application I want to have a custom logger. In this logger I want to send async requests to some host.
I have an error:
RuntimeError: This event loop is already running
My code:
import asyncio
import functools
class HttpHandler(logging.Handler):
"""
Logging Handler for fluent.
Sends HTTP POST requests
"""
def __init__(self, tag: str, host: str, port: int):
self.tag = tag
self.host = host
self.port = port
self.loop = asyncio.get_event_loop()
self.loop.run_forever()
logging.Handler.__init__(self)
async def emit(self, record: logging.LogRecord):
payload = {} # some data
future = self.loop.run_in_executor(None, functools.partial(requests.post, data={
"url": self.host,
"json": payload
}))
response = await future
In my fastapi app I want to use logger like this:
@app.on_event("startup")
async def startup():
#replace fastapi handlers
logger.handlers = ServiceLogger.get_logger_instance().handlers # special service that gives me custom logger
@app.post("/some_url")
async def foo(body=Depends(get_body)):
logger.info("TEXT")
UPDATE: I was trying to apply aiohttp library
async def emit(self, record: logging.LogRecord):
payload = {
...
"message": record.message # log message
}
await aiohttp.post(self.host, json=payload)
But looks like it's not working because I see warning
RuntimeWarning: coroutine 'HttpHandler.emit' was never awaited
self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Upvotes: 1
Views: 830
Reputation: 110311
The problem in your design is that you simply assume that the emit
method when subclassing a logging handler can be an async method.
It is a sync method, and it is documented as so - the fact a few, very convenient, 3rdy party libraries can detect and switch automatically between sync and async methods does not mean "any Python method in ay framework can be either async or sync and everything will just work".
Since you are already under an asyncio loop, it is just a matter of writing it as a sync function, and create and submit to the loop any async tasks from inside it (without trying to directly await those). Just take some care so that you keep a hard-reference to these created tasks, and for they to clean-up after themselves when they are done. A set containing the active tasks will do.
class HttpHandler(logging.Handler):
def __init__(self, tag: str, host: str, port: int):
self.tag = tag
self.host = host
self.port = port
# NO! none of these - Just let the loop be handled by the framework (or awsgi server)
# self.loop = asyncio.get_event_loop()
# self.loop.run_forever()
self.running_tasks = set()
logging.Handler.__init__(self)
# this _must_ be a synchronous, regular, method
def emit(self, record: logging.LogRecord):
payload = {} # some data
loop = asyncio.get_event_loop() # retrieve the running loop here: no race condition upon app initialization;
# + free up the awsgi server to restart/start new loops if it wants to.
async def aux(payload):
return await loop.run_in_executor(None, functools.partial(requests.post, data={
"url": self.host,
"json": payload
})))
future = asyncio.create_task(aux(payload=payload))
# wrapping the future in a task ensure it is scheduled in the running loop.
self.running_tasks.add(future)
# add a callback to remove the reference to the running task:
future.add_done_callback(lambda task: self.running_tasks.remove(task))
Other than the remarks above, this way of doing things will actually take advantage of using aiohttp and httpx combined - your http-views won't have to ẅait until logging is complete" to return, and the actual logging will be completed in background as your server is doing other stuff.
Your previous design tried to await for the aiohttp request meaning that any logging in an http view function you would be serving would delay the completion of the view until after it was done.
(I mean: it will work as is, with requests
, but you can use httpx
instead, and get rid of the run_in_executor
part, just create an httpx.post
call as a task (or better, a wrapper over it that can handle exception in the logging post itself)
Upvotes: 2