Reputation: 5526
I have two tables, users (~200.000) and tweet (~2.000.000) I need to update all users, with the number of tweets, favourites(of their tweets), replies, retweets. This is inside a script:
@classmethod
def get_user_tweet_counts(cls, user_id):
return (db_session
.query(
func.sum(Tweet.favorite_count).label('favorite_count'),
func.sum(Tweet.retweet_count).label('retweet_count'),
func.sum(Tweet.reply_count).label('reply_count'),
func.count(Tweet.id).label('tweet_count'))
.filter(Tweet.user_id == user_id)
.group_by(Tweet.user_id).first()) # This will always be one result, should I query differently?
db_session:
engine = create_engine('postgresql://tweetsql:[email protected]/tweetsql')
db_session = scoped_session(sessionmaker(autocommit=False,
autoflush=True,
bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()
The 10 minute loop:
for user in all_users:
update_count += 1
aggregation_result = Tweet.get_user_tweet_counts(user.id)
user.total_tweet_favourites = aggregation_result[0] or 0
user.total_tweet_retweets = aggregation_result[1] or 0
user.total_tweet_replies = aggregation_result[2] or 0
user.tweet_count = aggregation_result[3] or 0
User.save() # this just calls db_session.commit()
# We only commit the session once to speed things up
User and Tweet are declared like:
User(Base), Tweet(Base) (from the db_session snippet).
While this is running, python hits 80% cpu, and ~600mb memory. How can I make this better? Tweet has index on user_id and its own id.
Upvotes: 1
Views: 1001
Reputation: 20548
Here is a great answer by the author of SQLAlchemy. Basically, you'll want to bypass the ORM if you need to scale to large number of rows.
In your particular situation, you can write a single query to achieve the same result using SQL aggregation:
UPDATE users SET
total_tweet_favourites = aggregated.total_tweet_favourites,
total_tweet_retweets = aggregated.total_tweet_retweets,
total_tweet_replies = aggregated.total_tweet_replies,
tweet_count = aggregated.tweet_count
FROM (
SELECT
users.id AS id,
SUM(tweets.favorite_count) AS total_tweet_favourites,
SUM(tweets.retweet_count) AS total_tweet_retweets,
SUM(tweets.reply_count) AS total_tweet_replies,
COUNT(tweets.id) AS tweet_count
FROM users JOIN tweets ON tweets.user_id = users.id
GROUP BY users.id
) aggregated
WHERE users.id = aggregated.id;
To translate this to SQLAlchemy:
aggregated = session \
.query(
User.id.label("id"),
func.sum(Tweet.favorite_count).label("total_tweet_favourites"),
func.sum(Tweet.retweet_count).label("total_tweet_retweets"),
func.sum(Tweet.reply_count).label("total_tweet_replies"),
func.count(Tweet.id).label("tweet_count")) \
.select_from(User) \
.join(Tweet) \
.group_by(User.id) \
.subquery() \
.alias("aggregated")
query = User.__table__ \
.update() \
.values(
total_tweet_favourites=aggregated.c.total_tweet_favourites,
total_tweet_retweets=aggregated.c.total_tweet_retweets,
total_tweet_replies=aggregated.c.total_tweet_replies,
tweet_count=aggregated.c.tweet_count) \
.where(User.__table__.c.id == aggregated.c.id)
session.execute(query)
Upvotes: 2