Reputation: 291
The example here shows how to create both client & server in python using Remote procedure call (RPC).
But I can't imagine how the FastAPI service might be a server to consume requests from RCP client using pika for RabbitMQ.
Any web service will be requested by calling them explicitly, however, I can't imagine how to integrate RabbitMQ consumer inside web service.
On other hand, for the client it could be easy to do that, by calling the web service explicitly you can publish a request for the queue, see this example
Any help please? or a good start for that?
Upvotes: 5
Views: 16667
Reputation: 504
You can use aio_pika
with RPC pattern and do the following:
Service 1 (consumes)
Consume in a loop:
# app/__init__.py
from fastapi import FastAPI
from app.rpc import consume
app = FastAPI()
...
@app.on_event('startup')
def startup():
loop = asyncio.get_event_loop()
# use the same loop to consume
asyncio.ensure_future(consume(loop))
...
Create connection, channel and register remote methods to be called from another service:
# app/rpc.py
from aio_pika import connect_robust
from aio_pika.patterns import RPC
from app.config import config
__all__ = [
'consume'
]
def remote_method():
# DO SOMETHING
# Move this method along with others to another place e.g. app/rpc_methods
# I put it here for simplicity
return 'It works!'
async def consume(loop):
connection = await connect_robust(config.AMQP_URI, loop=loop)
channel = await connection.channel()
rpc = await RPC.create(channel)
# Register your remote method
await rpc.register('remote_method', remote_method, auto_delete=True)
return connection
That's all you need to consume and respond now let's see the second service that calls this remote method.
Service 2 (calls remote method)
Let's create RPC middleware first to easily manage and access RPC object to call our remote methods from API functions:
# app/utils/rpc_middleware.py
import asyncio
from fastapi import Request, Response
from aio_pika import connect_robust
from aio_pika.patterns import RPC
from app.config import config
__all__ = [
'get_rpc',
'rpc_middleware'
]
async def rpc_middleware(request: Request, call_next):
response = Response("Internal server error", status_code=500)
try:
# You can also pass a loop as an argument. Keep it here now for simplicity
loop = asyncio.get_event_loop()
connection = await connect_robust(config.AMQP_URI, loop=loop)
channel = await connection.channel()
request.state.rpc = await RPC.create(channel)
response = await call_next(request)
finally:
# UPD: just thought that we probably want to keep queue and don't
# recreate it for each request so we can remove this line and move
# connection, channel and rpc initialisation out from middleware
# and do it once on app start
# Also based of this: https://github.com/encode/starlette/issues/1029
# it's better to create ASGI middleware instead of HTTP
await request.state.rpc.close()
return response
# Dependency to use rpc inside routes functions
def get_rpc(request: Request):
rpc = request.state.rpc
return rpc
Apply RPC middleware:
# app/__init__.py
from app.utils import rpc_middleware
...
app.middleware('http')(rpc_middleware)
...
Use RPC object via dependency in an API function:
# app/api/whatever.py
from aio_pika.patterns import RPC
from app.utils import get_rpc
...
@router.get('/rpc')
async def rpc_test(rpc: RPC = Depends(get_rpc)):
response = await rpc.proxy.remote_method()
...
Add some logging to track what's happening in both services. Also you can combine RPC logic from both services into one to be able to consume and call remote methods from whithin the same service.
Hope it helps to get basic idea.
Upvotes: 10