Reputation: 833
I have a simple polling script that polls entries based on new ID's in a MSSQL table. I'm using SQLAlchemy
's ORM to create a table class and then query that table. I want to be able to add more tables "dynamically" without coding it directly into the method.
My polling function:
def poll_db():
query = db.query(
Transactions.ID).order_by(Transactions.ID.desc()).limit(1)
# Continually poll for new images to classify
max_id_query = query
last_max_id = max_id_query.scalar()
while True:
max_id = max_id_query.scalar()
if max_id > last_max_id:
print(
f"New row(s) found. "
f"Processing ids {last_max_id + 1} through {max_id}"
)
# Insert ML model
id_query = db.query(Transactions).filter(
Transactions.ID > last_max_id)
df_from_query = pd.read_sql_query(
id_query.statement, db.bind, index_col='ID')
print(f"New query was made")
last_max_id = max_id
time.sleep(5)
My table model:
import sqlalchemy as db
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import defer, relationship, query
from database import SessionLocal, engine
insp = db.inspect(engine)
db_list = insp.get_schema_names()
Base = declarative_base(cls=BaseModel)
class Transactions(Base):
__tablename__ = 'simulation_data'
sender_account = db.Column('sender_account', db.BigInteger)
recipient_account = db.Column('recipient_account', db.String)
sender_name = db.Column('sender_name', db.String)
recipient_name = db.Column('recipient_name', db.String)
date = db.Column('date', db.DateTime)
text = db.Column('text', db.String)
amount = db.Column('amount', db.Float)
currency = db.Column('currency', db.String)
transaction_type = db.Column('transaction_type', db.String)
fraud = db.Column('fraud', db.BigInteger)
swift_bic = db.Column('swift_bic', db.String)
recipient_country = db.Column('recipient_country', db.String)
internal_external = db.Column('internal_external', db.String)
ID = Column('ID', db.BigInteger, primary_key=True)
QUESTION
How can I pass the table class name "dynamically" in the likes of poll_db(tablename)
, where tablename='Transactions'
, and instead of writing similar queries for multiple tables, such as:
query = db.query(Transactions.ID).order_by(Transactions.ID.desc()).limit(1)
query2 = db.query(Transactions2.ID).order_by(Transactions2.ID.desc()).limit(1)
query3 = db.query(Transactions3.ID).order_by(Transactions3.ID.desc()).limit(1)
The tables will have identical structure, but different data.
Upvotes: 0
Views: 819
Reputation: 1265
I can't give you a full example right now (will edit later) but here's one hacky way to do it (the documentation will probably be a better place to check):
def dynamic_table(tablename):
for class_name, cls in Base._decl_class_registry.items():
if cls.__tablename__ == tablename:
return cls
Transactions2 = dynamic_table("simulation_data")
assert Transactions2 is Transactions
The returned class is the model you want. Keep in mind that Base
can only access the tables that have been subclassed already so if you have them in other modules you need to import them first so they are registered as Base
's subclasses.
For selecting columns, something like this should work:
def dynamic_table_with_columns(tablename, *columns):
cls = dynamic_table(tablename)
subset = []
for col_name in columns:
column = getattr(cls, col_name)
if column:
subset.append(column)
# in case no columns were given
if not subset:
return db.query(cls)
return db.query(*subset)
Upvotes: 1