Sean Fujiwara
Sean Fujiwara

Reputation: 4546

How can I maintain a consistent count of entities with App Engine?

Simplified context:

I have this schema for storing users, their messages, and a count of how many unread messages they have.

from google.appengine.ext.ndb import *

class User(Model):
  unread_messages = IntegerProperty()

class Message(Model):
  read = BooleanProperty()
  user_id = IntegerProperty()

My goal is for messages to contain the correct value after a user has read some messages. When creating messages, it's easy to use a transaction to increase the unread_messages property by one and move on. But reading messages seems to be more difficult.

Here's what I've tried:

1. Updating the entity using only relative changes.

The problem is that the delta comes from a query, which could return the same results twice before a write is complete.

#User reads messages

query = Message.query()
query = query.filter(Message.read = False)
query = query.filter(Message.user_id = user.key.id())
unread_messages = query.fetch(10)

for message in unread_messages:
  message.read = True

put_multi(unread_messages)

txn(user.key.id(), -len(unread_messages))

@run_in_transaction
def txn(id, delta):
  user = Key(User, id).get()
  user.unread_messages += delta
  user.put()

2. Running a count query after a put.

As far as I can tell, there's no way to execute code after a write is guaranteed to be visible in a query. So for this method, I just set a delay of a few seconds on the task. This works most of the time, but it's easy to see how a write that takes longer than my delay would result in an incorrect value.

query = Message.query()
query = query.filter(Message.read = False)
query = query.filter(Message.user_id = user.key.id())
unread_messages = query.fetch(10)

for message in unread_messages:
  message.read = True

put_multi(unread_messages)

taskqueue.add(url = '/tasks/update-unread-messages', params = {'user_id': user.key.id()}, countdown = 10)

Associated task:

query = Message.query()
query = query.filter(Message.read = False)
query = query.filter(Message.user_id = user.key.id())
count = query.count()
user.unread_messages = count
user.put()

@run_in_transaction
def txn(id, delta):
  user = Key(User, id).get()
  user.unread_messages += delta
  user.put()

Here's what I've considered trying:

  1. Giving a user's messages the same ancestor so I can do ancestor queries. This would be a last resort because of the performance constraints of ancestor queries, and the fact that I don't want to have to replace every entity.

  2. Using the transactional flag with the taskqueue. For my put_multi to be transactional though, I would need to use an ancestor query.

  3. Restructuring the schema in various ways, but it always comes back to being able to run code once I know a put is complete.

Upvotes: 1

Views: 222

Answers (1)

voscausa
voscausa

Reputation: 11706

It feels expensive to update all the messages which will be read and update a counter. Why not use an entity with the keys of the unread messages to read multi fast (by key) and update only a single user entity, which has an index and count for all the unread messages.

class User(ndb.Model):
    unread_messages_count = ndb.IntegerProperty(default=0) 
    unread_messages_index = ndb.KeyProperty(repeated=True)

This unread message index will only work if the messages are read in the same order as they arrive. See comments below.

Upvotes: 1

Related Questions