Curtwagner1984
Curtwagner1984

Reputation: 2088

How to set a M2M hybrid count property in SQLAlchemy?

I have two tables bound by a M2M relationship. Books and Writers, writers can have many books and books can have many writers.

I want to have a count property on both books and writers so I could sort them by, for example, the writer who wrote the most books.

# many to many association table
book_writer_association_table = Table('book_writer_association',Base.metadata,
                                      Column('book_id',ForeignKey('book.id'), primary_key=True),
                                      Column('Writer',ForeignKey('writer.id'), primary_key=True)
)

class Book(Base):
    __tablename__ = 'base'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    writers = relationship(Writer,secondary=book_writer_association_table,back_populates="books")




class Writer(Base):
    __tablename__ = 'writer'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    books = relationship(Book,secondery=book_writer_association_table,back_populates="writers")

    @hybrid_property
    def book_count(self):
        return len(self.books)

    @book_count.expression
    def book_count(cls):
        #what goes here?        

I tried various approaches like detailed here:

class Foo(Base):
    __tablename__ = 'foo'
    id = Column(Integer, primary_key=True)
    bar_id = Column(Integer, ForeignKey('bar.id'))
    bar = relationship('Bar')

class Bar(Base):
    __tablename__ = 'bar'
    id = Column(Integer, primary_key=True)

    @hybrid_property
    def foo_count(self):
        return object_session(self).query(Foo).filter(Foo.bar==self).count()

    @foo_count.expression
    def foo_count(cls):
        return select([func.count(Foo.id)]).where(Foo.bar_id == cls.id).label('foo_count')

However, in this example, there are only two tables and I'm unsure how to achieve a more complicated join here. Another user suggested using column_property but I run into exactly the same problem there. I'm unsure how to further add tables to the join.

Upvotes: 3

Views: 765

Answers (1)

Yevhen Bondar
Yevhen Bondar

Reputation: 4707

You can customize idea from here to M2M case. For this you should mention association_table in hybrid_property instead of Book table. So, you eliminate join with Book table and simplify your case to One-to-Many relation.

I came up with this solution.

from typing import List

from sqlalchemy import Column, ForeignKey, Integer, String, select, func, create_engine, Table
from sqlalchemy.ext.declarative import as_declarative
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship, object_session, sessionmaker, Session

# Declare models
@as_declarative()
class Base:
    pass

book_writer_association_table = Table('book_writer_association',Base.metadata,
                                      Column('book_id',ForeignKey('book.id'), primary_key=True),
                                      Column('writer_id',ForeignKey('writer.id'), primary_key=True)
)

class Book(Base):
    __tablename__ = 'book'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    writers = relationship("Writer", secondary=book_writer_association_table, back_populates="books")




class Writer(Base):
    __tablename__ = 'writer'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    books = relationship("Book", secondary=book_writer_association_table, back_populates="writers")

    @hybrid_property
    def book_count(self):
        return object_session(self).query(book_writer_association_table).filter(book_writer_association_table.c.writer_id == self.id).count()

    @book_count.expression
    def book_count(cls):
        return select([func.count(book_writer_association_table.c.book_id)]).where(book_writer_association_table.c.writer_id == cls.id).label('book_count')

# Load DB schema
engine = create_engine('sqlite:///sqlite3.db')
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(autocommit=True, bind=engine)
db: Session = SessionLocal()

# Creating test instances
b1 = Book(name="Book 1")
b2 = Book(name="Book 2")
db.add(b1)
db.add(b2)

w1 = Writer(name="Writer 1")
w2 = Writer(name="Writer 2")
db.add(w1)
db.add(w2)

b1.writers.append(w1)
b1.writers.append(w2)
b2.writers.append(w1)

query = db.query(Writer, Writer.book_count)
print(str(query)) # checking query
print()
writers: List[Writer] = query.all() # testing query
for writer, book_count in writers:
    print(f"{writer.name}: {book_count}")

Result:

> Writer 1: 2
> Writer 2: 1

I'm unsure how to further add tables to the join.

SQL from here db.query(Writer, Writer.book_count) looks clean, without any joins. So, I think you shouldn't have any problems with subsequent joins.

> SELECT writer.id AS writer_id, writer.name AS writer_name, (SELECT count(book_writer_association.book_id) AS count_1 
> FROM book_writer_association 
> WHERE book_writer_association.writer_id = writer.id) AS book_count 
> FROM writer

Edit: If you need join Book table to provide additional filtering you can do it like this. Here I filtered book with price less than 150

from typing import List

from sqlalchemy import Column, ForeignKey, Integer, String, select, func, create_engine, Table
from sqlalchemy.ext.declarative import as_declarative
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship, object_session, sessionmaker, Session

# Declare models
@as_declarative()
class Base:
    pass

book_writer_association_table = Table('book_writer_association',Base.metadata,
                                      Column('book_id',ForeignKey('book.id'), primary_key=True),
                                      Column('writer_id',ForeignKey('writer.id'), primary_key=True)
)

class Book(Base):
    __tablename__ = 'book'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    price = Column(Integer)
    writers = relationship("Writer", secondary=book_writer_association_table, back_populates="books")




class Writer(Base):
    __tablename__ = 'writer'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    books = relationship("Book", secondary=book_writer_association_table, back_populates="writers")

    @hybrid_property
    def book_count(self):
        return (
            object_session(self)
                .query(book_writer_association_table)
                .join(Book, Book.id == book_writer_association_table.c.book_id)
                .filter(book_writer_association_table.c.writer_id == self.id)
                .filter(Book.price > 150)
                .count()
        )

    @book_count.expression
    def book_count(cls):
        # return select([func.count(book_writer_association_table.c.book_id)]).where(book_writer_association_table.c.writer_id == cls.id).label('book_count')
        #
        return (
            select([func.count(book_writer_association_table.c.book_id)])
                .join(Book, Book.id == book_writer_association_table.c.book_id)
                .where(book_writer_association_table.c.writer_id == cls.id)
                .filter(Book.price > 150)
                .label('book_count')
        )

# Load DB schema
engine = create_engine('sqlite:///sqlite3.db')
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(autocommit=True, bind=engine)
db: Session = SessionLocal()

# Creating test instances
b1 = Book(name="Book 1", price=100)
b2 = Book(name="Book 2", price=200)
db.add(b1)
db.add(b2)

w1 = Writer(name="Writer 1")
w2 = Writer(name="Writer 2")
db.add(w1)
db.add(w2)

b1.writers.append(w1)
b1.writers.append(w2)
b2.writers.append(w1)

query = db.query(Writer, Writer.book_count)
print(str(query)) # checking query
print()
writers: List[Writer] = query.all() # testing query
for writer, book_count in writers:
    print(f"{writer.name}: {book_count}")

query:

SELECT writer.id              AS writer_id,
       writer.name            AS writer_name,
       (SELECT count(book_writer_association.book_id) AS count_1
        FROM book_writer_association
                 JOIN book ON book.id = book_writer_association.book_id
        WHERE book_writer_association.writer_id = writer.id
          AND book.price > ?) AS book_count
FROM writer

Upvotes: 3

Related Questions