Artem
Artem

Reputation: 833

SQLAlchemy - pass a dynamic tablename to query function?

I have a simple polling script that polls entries based on new ID's in a MSSQL table. I'm using SQLAlchemy's ORM to create a table class and then query that table. I want to be able to add more tables "dynamically" without coding it directly into the method.

My polling function:

def poll_db():
    query = db.query(
        Transactions.ID).order_by(Transactions.ID.desc()).limit(1)

    # Continually poll for new images to classify
    max_id_query = query

    last_max_id = max_id_query.scalar()
    
    while True:

        max_id = max_id_query.scalar()
        if max_id > last_max_id:
            print(
                f"New row(s) found. "
                f"Processing ids {last_max_id + 1} through {max_id}"
            )
            # Insert ML model
            id_query = db.query(Transactions).filter(
                Transactions.ID > last_max_id)

            df_from_query = pd.read_sql_query(
                id_query.statement, db.bind, index_col='ID')

            print(f"New query was made")

            last_max_id = max_id
        time.sleep(5)

My table model:

import sqlalchemy as db
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import defer, relationship, query

from database import SessionLocal, engine

insp = db.inspect(engine)
db_list = insp.get_schema_names()

Base = declarative_base(cls=BaseModel)

class Transactions(Base):
    __tablename__ = 'simulation_data'
    sender_account = db.Column('sender_account', db.BigInteger)
    recipient_account = db.Column('recipient_account', db.String)
    sender_name = db.Column('sender_name', db.String)
    recipient_name = db.Column('recipient_name', db.String)
    date = db.Column('date', db.DateTime)
    text = db.Column('text', db.String)
    amount = db.Column('amount', db.Float)
    currency = db.Column('currency', db.String)
    transaction_type = db.Column('transaction_type', db.String)
    fraud = db.Column('fraud', db.BigInteger)
    swift_bic = db.Column('swift_bic', db.String)
    recipient_country = db.Column('recipient_country', db.String)
    internal_external = db.Column('internal_external', db.String)
    ID = Column('ID', db.BigInteger, primary_key=True)

QUESTION

How can I pass the table class name "dynamically" in the likes of poll_db(tablename), where tablename='Transactions', and instead of writing similar queries for multiple tables, such as:

query = db.query(Transactions.ID).order_by(Transactions.ID.desc()).limit(1)       


query2 = db.query(Transactions2.ID).order_by(Transactions2.ID.desc()).limit(1)
        

query3 = db.query(Transactions3.ID).order_by(Transactions3.ID.desc()).limit(1)
        

The tables will have identical structure, but different data.

Upvotes: 0

Views: 819

Answers (1)

Kostas Mouratidis
Kostas Mouratidis

Reputation: 1265

I can't give you a full example right now (will edit later) but here's one hacky way to do it (the documentation will probably be a better place to check):

def dynamic_table(tablename):
    for class_name, cls in Base._decl_class_registry.items():
        if cls.__tablename__ == tablename:
            return cls

Transactions2 = dynamic_table("simulation_data")
assert Transactions2 is Transactions

The returned class is the model you want. Keep in mind that Base can only access the tables that have been subclassed already so if you have them in other modules you need to import them first so they are registered as Base's subclasses.

For selecting columns, something like this should work:

def dynamic_table_with_columns(tablename, *columns):
    cls = dynamic_table(tablename)
    subset = []
    for col_name in columns:
        column = getattr(cls, col_name)
        if column:
            subset.append(column)
    
    # in case no columns were given
    if not subset:
        return db.query(cls)

    return db.query(*subset)

Upvotes: 1

Related Questions