Julian
Julian

Reputation: 556

Concurrent calls to Phoenix using phoenixdb python package

I am currently stress testing an API that relies on the phoenixdb Python package to execute select queries on a Phoenix view. However, it appears that the calls made using phoenixdb are synchronous in nature, causing requests to be answered in a synchronous manner.

Currently, I only found a workaround by encapsulating the phoenixdb call within a dedicated event loop, like this :

import asyncio
from phoenixdb import connect
from fastapi import FastAPI

app = FastAPI()

# This takes around 5 seconds
def synchronous_phoenixdb_query():
    database_url = 'http://localhost:8765/'
    connection = connect(database_url, autocommit=True)
    cursor = connection.cursor()

    # Execute a query
    cursor.execute('SELECT * FROM my_table')

    result = cursor.fetchall()

    cursor.close()
    connection.close()

    return result

# Asynchronous wrapper
async def asynchronous_phoenixdb_query():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, synchronous_phoenixdb_query)
    return result

@app.get("/phoenixdb_query")
async def phoenixdb_query():
    result = await asynchronous_phoenixdb_query()
    return {"result": result}

However I am not sure about the limits of this approach.

Upvotes: 0

Views: 68

Answers (1)

lsabi
lsabi

Reputation: 4476

The best thing to do, in my opinion, is to create a class that wraps and maintains the status of the connection. If available, it is better to use a connection pool, but I don't know much about this db in particular.

class PhoenixAsyncWrapper:
    def __init__(self, **kwargs):
        # Add setup/config paramters

    async def connect(self):
        # perform the connection to the database and save it into self._db
          or something similar
        # Note that the database connection should be a pool of connections
          (i.e. spawning multiple threads that can interact with the db in parallel)

    async def query(self, query) -> str:
        # Perform a query spawning a synchronous connection
          from the pool and running it in the executor

    async def close(self):
        # Do a graceful shutdown

Upvotes: 1

Related Questions