How to use FastAPI as consumer for RabbitMQ (RPC) How to use FastAPI as consumer for RabbitMQ (RPC) flask flask

How to use FastAPI as consumer for RabbitMQ (RPC)


You can use aio_pika with RPC pattern and do the following:

Service 1 (consumes)

Consume in a loop:

# app/__init__.pyfrom fastapi import FastAPIfrom app.rpc import consumeapp = FastAPI()...@app.on_event('startup')def startup():    loop = asyncio.get_event_loop()    # use the same loop to consume    asyncio.ensure_future(consume(loop))...

Create connection, channel and register remote methods to be called from another service:

# app/rpc.pyfrom aio_pika import connect_robustfrom aio_pika.patterns import RPCfrom app.config import config__all__ = [    'consume']def remote_method():    # DO SOMETHING    # Move this method along with others to another place e.g. app/rpc_methods    # I put it here for simplicity    return 'It works!'async def consume(loop):    connection = await connect_robust(config.AMQP_URI, loop=loop)    channel = await connection.channel()    rpc = await RPC.create(channel)    # Register your remote method    await rpc.register('remote_method', remote_method, auto_delete=True)    return connection

That's all you need to consume and respond now let's see the second service that calls this remote method.

Service 2 (calls remote method)

Let's create RPC middleware first to easily manage and access RPC object to call our remote methods from API functions:

# app/utils/rpc_middleware.pyimport asynciofrom fastapi import Request, Responsefrom aio_pika import connect_robustfrom aio_pika.patterns import RPCfrom app.config import config__all__ = [    'get_rpc',    'rpc_middleware']async def rpc_middleware(request: Request, call_next):    response = Response("Internal server error", status_code=500)    try:        # You can also pass a loop as an argument. Keep it here now for simplicity        loop = asyncio.get_event_loop()        connection = await connect_robust(config.AMQP_URI, loop=loop)        channel = await connection.channel()        request.state.rpc = await RPC.create(channel)        response = await call_next(request)    finally:        # UPD: just thought that we probably want to keep queue and don't        # recreate it for each request so we can remove this line and move        # connection, channel and rpc initialisation out from middleware         # and do it once on app start        # Also based of this: https://github.com/encode/starlette/issues/1029        # it's better to create ASGI middleware instead of HTTP        await request.state.rpc.close()    return response# Dependency to use rpc inside routes functionsdef get_rpc(request: Request):    rpc = request.state.rpc    return rpc

Apply RPC middleware:

# app/__init__.pyfrom app.utils import rpc_middleware...app.middleware('http')(rpc_middleware)...

Use RPC object via dependency in an API function:

# app/api/whatever.pyfrom aio_pika.patterns import RPCfrom app.utils import get_rpc...@router.get('/rpc')async def rpc_test(rpc: RPC = Depends(get_rpc)):    response = await rpc.proxy.remote_method()    ...

Add some logging to track what's happening in both services. Also you can combine RPC logic from both services into one to be able to consume and call remote methods from whithin the same service.

Hope it helps to get basic idea.


There is a great tutorial from youtube released today (April 28) by Andrej Baranovskij that talks about it.

I will provide the link below. You can check the github source code as well.

Video - Fastapi and RabbitMQ

Source code