Reputation: 2088
I have two tables bound by a M2M relationship. Books and Writers, writers can have many books and books can have many writers.
I want to have a count
property on both books and writers so I could sort them by, for example, the writer who wrote the most books.
# many to many association table
book_writer_association_table = Table('book_writer_association',Base.metadata,
Column('book_id',ForeignKey('book.id'), primary_key=True),
Column('Writer',ForeignKey('writer.id'), primary_key=True)
)
class Book(Base):
__tablename__ = 'base'
id = Column(Integer, primary_key=True)
name = Column(String)
writers = relationship(Writer,secondary=book_writer_association_table,back_populates="books")
class Writer(Base):
__tablename__ = 'writer'
id = Column(Integer, primary_key=True)
name = Column(String)
books = relationship(Book,secondery=book_writer_association_table,back_populates="writers")
@hybrid_property
def book_count(self):
return len(self.books)
@book_count.expression
def book_count(cls):
#what goes here?
I tried various approaches like detailed here:
class Foo(Base):
__tablename__ = 'foo'
id = Column(Integer, primary_key=True)
bar_id = Column(Integer, ForeignKey('bar.id'))
bar = relationship('Bar')
class Bar(Base):
__tablename__ = 'bar'
id = Column(Integer, primary_key=True)
@hybrid_property
def foo_count(self):
return object_session(self).query(Foo).filter(Foo.bar==self).count()
@foo_count.expression
def foo_count(cls):
return select([func.count(Foo.id)]).where(Foo.bar_id == cls.id).label('foo_count')
However, in this example, there are only two tables and I'm unsure how to achieve a more complicated join here. Another user suggested using column_property
but I run into exactly the same problem there. I'm unsure how to further add tables to the join.
Upvotes: 3
Views: 765
Reputation: 4707
You can customize idea from here to M2M case. For this you should mention association_table
in hybrid_property
instead of Book
table. So, you eliminate join with Book
table and simplify your case to One-to-Many relation.
I came up with this solution.
from typing import List
from sqlalchemy import Column, ForeignKey, Integer, String, select, func, create_engine, Table
from sqlalchemy.ext.declarative import as_declarative
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship, object_session, sessionmaker, Session
# Declare models
@as_declarative()
class Base:
pass
book_writer_association_table = Table('book_writer_association',Base.metadata,
Column('book_id',ForeignKey('book.id'), primary_key=True),
Column('writer_id',ForeignKey('writer.id'), primary_key=True)
)
class Book(Base):
__tablename__ = 'book'
id = Column(Integer, primary_key=True)
name = Column(String)
writers = relationship("Writer", secondary=book_writer_association_table, back_populates="books")
class Writer(Base):
__tablename__ = 'writer'
id = Column(Integer, primary_key=True)
name = Column(String)
books = relationship("Book", secondary=book_writer_association_table, back_populates="writers")
@hybrid_property
def book_count(self):
return object_session(self).query(book_writer_association_table).filter(book_writer_association_table.c.writer_id == self.id).count()
@book_count.expression
def book_count(cls):
return select([func.count(book_writer_association_table.c.book_id)]).where(book_writer_association_table.c.writer_id == cls.id).label('book_count')
# Load DB schema
engine = create_engine('sqlite:///sqlite3.db')
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(autocommit=True, bind=engine)
db: Session = SessionLocal()
# Creating test instances
b1 = Book(name="Book 1")
b2 = Book(name="Book 2")
db.add(b1)
db.add(b2)
w1 = Writer(name="Writer 1")
w2 = Writer(name="Writer 2")
db.add(w1)
db.add(w2)
b1.writers.append(w1)
b1.writers.append(w2)
b2.writers.append(w1)
query = db.query(Writer, Writer.book_count)
print(str(query)) # checking query
print()
writers: List[Writer] = query.all() # testing query
for writer, book_count in writers:
print(f"{writer.name}: {book_count}")
Result:
> Writer 1: 2
> Writer 2: 1
I'm unsure how to further add tables to the join.
SQL from here db.query(Writer, Writer.book_count)
looks clean, without any joins. So, I think you shouldn't have any problems with subsequent joins.
> SELECT writer.id AS writer_id, writer.name AS writer_name, (SELECT count(book_writer_association.book_id) AS count_1
> FROM book_writer_association
> WHERE book_writer_association.writer_id = writer.id) AS book_count
> FROM writer
Edit: If you need join Book
table to provide additional filtering you can do it like this. Here I filtered book with price less than 150
from typing import List
from sqlalchemy import Column, ForeignKey, Integer, String, select, func, create_engine, Table
from sqlalchemy.ext.declarative import as_declarative
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship, object_session, sessionmaker, Session
# Declare models
@as_declarative()
class Base:
pass
book_writer_association_table = Table('book_writer_association',Base.metadata,
Column('book_id',ForeignKey('book.id'), primary_key=True),
Column('writer_id',ForeignKey('writer.id'), primary_key=True)
)
class Book(Base):
__tablename__ = 'book'
id = Column(Integer, primary_key=True)
name = Column(String)
price = Column(Integer)
writers = relationship("Writer", secondary=book_writer_association_table, back_populates="books")
class Writer(Base):
__tablename__ = 'writer'
id = Column(Integer, primary_key=True)
name = Column(String)
books = relationship("Book", secondary=book_writer_association_table, back_populates="writers")
@hybrid_property
def book_count(self):
return (
object_session(self)
.query(book_writer_association_table)
.join(Book, Book.id == book_writer_association_table.c.book_id)
.filter(book_writer_association_table.c.writer_id == self.id)
.filter(Book.price > 150)
.count()
)
@book_count.expression
def book_count(cls):
# return select([func.count(book_writer_association_table.c.book_id)]).where(book_writer_association_table.c.writer_id == cls.id).label('book_count')
#
return (
select([func.count(book_writer_association_table.c.book_id)])
.join(Book, Book.id == book_writer_association_table.c.book_id)
.where(book_writer_association_table.c.writer_id == cls.id)
.filter(Book.price > 150)
.label('book_count')
)
# Load DB schema
engine = create_engine('sqlite:///sqlite3.db')
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(autocommit=True, bind=engine)
db: Session = SessionLocal()
# Creating test instances
b1 = Book(name="Book 1", price=100)
b2 = Book(name="Book 2", price=200)
db.add(b1)
db.add(b2)
w1 = Writer(name="Writer 1")
w2 = Writer(name="Writer 2")
db.add(w1)
db.add(w2)
b1.writers.append(w1)
b1.writers.append(w2)
b2.writers.append(w1)
query = db.query(Writer, Writer.book_count)
print(str(query)) # checking query
print()
writers: List[Writer] = query.all() # testing query
for writer, book_count in writers:
print(f"{writer.name}: {book_count}")
query:
SELECT writer.id AS writer_id,
writer.name AS writer_name,
(SELECT count(book_writer_association.book_id) AS count_1
FROM book_writer_association
JOIN book ON book.id = book_writer_association.book_id
WHERE book_writer_association.writer_id = writer.id
AND book.price > ?) AS book_count
FROM writer
Upvotes: 3