Shine J
Shine J

Reputation: 818

Get changed columns and values from output of a do_orm_execute event

I am trying to log the columns/values changed during an insert, update, delete statement using Session events.

I am able to print the statement with their params, however I need the column/value pair (ex. {"dept": "HR"} or {"dept": {"old": "Finance", "new": "HR"}).

Tried get_history() and inspect() but maybe I am doing it wrong. If possible, please point me to the right documentation/article as well so I can understand more about this. This is the code I have so far:

from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import Session
from sqlalchemy import create_engine
from sqlalchemy import event, inspect
from sqlalchemy.orm.attributes import get_history
from random import randint


class Base(DeclarativeBase):
    pass

class Employee(Base):

    __tablename__ = 'employee'

    id = Column(Integer, primary_key=True)
    name = Column(String, nullable=False)
    dept = Column(String, nullable=False)


class DBAPI:

    def __init__(self):
        self.db_path = 'test.db'
        self.engine = create_engine(
            f"sqlite:///{self.db_path}", echo=False, echo_pool=False
        )
        Base.metadata.create_all(self.engine)
        #event.listen(Employee, "after_insert", self.test)
        #event.listens_for(Employee, "after_update", self.test_1)
        #event.listen(Employee, "after_update", self.test_1)

    def add_record(self, name):
        with Session(self.engine) as session:
            session.add(
                Employee(
                    name=name, steps="Finance"
                )
            )
            session.commit()

    def update_record(self, dept):
        # ORMExecuteState
        def testing(state):
            # print the statement with values
            st = state.statement.compile(
               compile_kwargs={"literal_binds": True}
            )
            print(st)
            # print(state.is_update)
            # print(state.all_mappers[0].column_attrs)
            # mapper = state.all_mappers[0]
            # print([a.key for a in mapper.column_attrs])
            # colattrs = [a for a in mapper.column_attrs]
            # print([a for a in dir(colattrs[1]) if not a.startswith('_')])
            # print(colattrs[1].active_history)
            
            #print(state)
            #print(state.statement)
            #print(state.parameters)
            #print(state.user_defined_options)
            ##print(state.statement.column_descriptions)
            #print(state.all_mappers)
            #print(state.execution_options)
            ##print(state.load_options)
            #print(state.parameters)
            #print(state.statement.compile().params)

            #st = state.statement.compile(
            #    compile_kwargs={"literal_binds": True}
            #)
            #print(st)
            #print(type(state.statement))
            #print(state.bind_arguments)
            #mapper = state.bind_arguments.get("mapper")
            #print(type(mapper))
            #print(type(mapper.attrs))
            #for s in mapper.attrs:
            #    print(s)
            #print([
            #    attr for attr in dir(state.statement)
            #    if not attr.startswith('_')
            #])
            #print(state.statement.corresponding_column())
            #for col in state.statement.exported_columns:
            #    print(col)

            #insp = inspect(state.statement)
            #print(type(insp))
            #for attr in insp.attrs:
            #    print(attr)
            #print(insp)
            #for attr in insp.attrs:
            #    print(attr)
            #    hist = get_history(attr, "steps")

            #print(state.statement.entity_description)
            #params = state.statement.compile().params
            #statement = state.statement
            #for param, val in params.items():
            #    statement = statement.replace(
            #        param, f"{

        with Session(self.engine) as session:
            event.listen(session, "do_orm_execute", testing)
            #event.listen(session, "after_commit", testing1)
            session.query(
                Employee
            ).filter(
                Employee.name == "Bob"
            ).update(
                {
                    Employee.dept: dept
                }
            )
            session.commit()



dbapi = DBAPI()
dbapi.update_record("HR")

Upvotes: 0

Views: 28

Answers (0)

Related Questions