Giannis
Giannis

Reputation: 5526

Improve SQLAlchemy update efficiency

I have two tables, users (~200.000) and tweet (~2.000.000) I need to update all users, with the number of tweets, favourites(of their tweets), replies, retweets. This is inside a script:

@classmethod
def get_user_tweet_counts(cls, user_id):
    return (db_session
        .query(
            func.sum(Tweet.favorite_count).label('favorite_count'),
            func.sum(Tweet.retweet_count).label('retweet_count'),
            func.sum(Tweet.reply_count).label('reply_count'),
            func.count(Tweet.id).label('tweet_count'))
        .filter(Tweet.user_id == user_id)
        .group_by(Tweet.user_id).first())  # This will always be one result, should I query differently?

db_session:

engine = create_engine('postgresql://tweetsql:[email protected]/tweetsql')
db_session = scoped_session(sessionmaker(autocommit=False,
                                         autoflush=True,
                                         bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()

The 10 minute loop:

for user in all_users:
    update_count += 1
    aggregation_result = Tweet.get_user_tweet_counts(user.id)
    user.total_tweet_favourites = aggregation_result[0] or 0
    user.total_tweet_retweets = aggregation_result[1] or 0
    user.total_tweet_replies = aggregation_result[2] or 0
    user.tweet_count = aggregation_result[3] or 0
User.save()  # this just calls db_session.commit()
# We only commit the session once to speed things up

User and Tweet are declared like:

User(Base), Tweet(Base) (from the db_session snippet).

While this is running, python hits 80% cpu, and ~600mb memory. How can I make this better? Tweet has index on user_id and its own id.

Upvotes: 1

Views: 1001

Answers (1)

univerio
univerio

Reputation: 20548

Here is a great answer by the author of SQLAlchemy. Basically, you'll want to bypass the ORM if you need to scale to large number of rows.

In your particular situation, you can write a single query to achieve the same result using SQL aggregation:

UPDATE users SET
  total_tweet_favourites = aggregated.total_tweet_favourites,
  total_tweet_retweets = aggregated.total_tweet_retweets,
  total_tweet_replies = aggregated.total_tweet_replies,
  tweet_count = aggregated.tweet_count
FROM (
  SELECT
    users.id AS id,
    SUM(tweets.favorite_count) AS total_tweet_favourites,
    SUM(tweets.retweet_count) AS total_tweet_retweets,
    SUM(tweets.reply_count) AS total_tweet_replies,
    COUNT(tweets.id) AS tweet_count
  FROM users JOIN tweets ON tweets.user_id = users.id
  GROUP BY users.id
) aggregated
WHERE users.id = aggregated.id;

To translate this to SQLAlchemy:

aggregated = session \
    .query(
        User.id.label("id"),
        func.sum(Tweet.favorite_count).label("total_tweet_favourites"),
        func.sum(Tweet.retweet_count).label("total_tweet_retweets"),
        func.sum(Tweet.reply_count).label("total_tweet_replies"),
        func.count(Tweet.id).label("tweet_count")) \
    .select_from(User) \
    .join(Tweet) \
    .group_by(User.id) \
    .subquery() \
    .alias("aggregated")
query = User.__table__ \
    .update() \
    .values(
        total_tweet_favourites=aggregated.c.total_tweet_favourites,
        total_tweet_retweets=aggregated.c.total_tweet_retweets,
        total_tweet_replies=aggregated.c.total_tweet_replies,
        tweet_count=aggregated.c.tweet_count) \
    .where(User.__table__.c.id == aggregated.c.id)
session.execute(query)

Upvotes: 2

Related Questions