Reputation: 4339
I am trying to generate dynamic sql queries based on certain conditions in SQLAlchemy
. I have a model that is gets queried constantly based on different conditions
The code goes like this:
condition 1:
allowed_names = ['a', 'b', 'c']
join_condition = f'and_(Parent.parent_id==Child.parent_id, Child.name.in_({allowed_names}))'
Parent.children = relationship('Child', primaryjoin=join_condition, lazy='selectin')
condition 2:
allowed_names = ['x', 'y', 'z']
join_condition = f'and_(Parent.parent_id==Child.parent_id, Child.name.in_({allowed_names}))'
Parent.children = relationship('Child', primaryjoin=join_condition, lazy='selectin')
I get results of the query the following way:
res = Parent.filter(Parent.parent_id.in_([1, 2, 3])).limit(100).offset(1).all()
If I run the query based on condition 1 first and run the query based on condition 2 again without stopping the program, it returns results based on query 1 since it ran first. After printing out the sql query that gets executed, I figured out that it only runs the condition that was executed first.
Does SQLALchemy cache the string query? I noticed the old value of allowed_names in the filter condition in the query
[cached since 7.236s ago] {'name_1_1': 'a', 'name_1_2': 'b', 'name_1_3': 'c'}
Am I missing something here, or is it a SQLAlchemy bug??
Upvotes: 1
Views: 1606
Reputation: 1
I don't think any of these solutions is ok. Filtering on python side is just wrong, if you have too many rows returned. And filtering with and_ also doesn't work well, if you are using aliased class with dynamic alias generation. I don't have any good solution for this except disabling caching.
Edit: i found this: expire_all
Upvotes: -1
Reputation: 757
Building on @Ian Wilson's answer.
Rather than setting up a single relationship and filtering in python, or the alternative as described where you have a different relationship for each variation (which doesn't work if your inputs are undeterminable prior to execution).
The preferred method is to apply additional criteria to the loader options as per the SQLAlchemy documentation.
q = session.query(Parent).options(
selectin(
Parent.children.and_(Parent.parent_id.in_[1, 2, 3]))
)
Upvotes: 1
Reputation: 9099
I don't think relationship
is meant to be used dynamically like this. Sqlalchemy probably extracts the information just the first time the class is loaded. So when you dynamically change the relationship property later nothing changes.
Also lazy="selectin"
actually eager loads this information which is more confusing if you are trying to dynamically change it.
I think you could probably just use "selectin" to load all the children for each parent and then just filter those children with python if there are not that many of them. Then you don't have to manage multiple relationships.
I guess it depends on if there are a lot of children per parent or not.
class Parent(Base):
__tablename__ = "parents"
id = Column(Integer, primary_key=True, index=True)
@property
def abc_children(self):
return [child for child in self.children of child.name in ["a" , "b", "c"]]
@property
def xyz_children(self):
return [child for child in self.children of child.name in ["x" , "y", "z"]]
children = relationship(Child, primaryjoin = "Parent.id==Child.parent_id", backref="parent", lazy="selectin")
# Then this would be accessed with
parent.xyz_children
# or
parent.abc_children
.options()
Otherwise you could load a viewonly=True
property on demand using options but that is kind of messy.
class Child(Base):
__tablename__ = "childs"
id = Column(Integer, primary_key=True, index=True)
parent_id = Column(Integer, ForeignKey('parents.id'), nullable=True)
name = Column(String)
class Parent(Base):
__tablename__ = "parents"
id = Column(Integer, primary_key=True, index=True)
abc_names = ["a" , "b", "c"]
abc_children = relationship(Child, primaryjoin = f"and_(Parent.id==Child.parent_id, Child.name.in_({abc_names}))", viewonly=True)
xyz_names = ["x" , "y", "z"]
xyz_children = relationship(Child, primaryjoin = f"and_(Parent.id==Child.parent_id, Child.name.in_({xyz_names}))", viewonly=True)
children = relationship(Child, primaryjoin = "Parent.id==Child.parent_id", backref="parent")
if __name__ == '__main__':
metadata.create_all(engine)
with Session(engine) as session:
# Test data
p = Parent()
session.add(p)
p.children.extend(Child(name=name) for name in ["a", "b", "z"])
session.add_all(p.children)
session.commit()
# This loads all 3 relationships, the two filtered and the entire list, you could pick or choose.
p = session.query(Parent).filter(Parent.id == p.id).options(
selectinload(Parent.abc_children),
selectinload(Parent.xyz_children),
selectinload(Parent.children)).first()
assert len(p.abc_children) == 2
assert len(p.xyz_children) == 1
assert len(p.children) == 3
Upvotes: 2