Reputation: 4546
I have this schema for storing users, their messages, and a count of how many unread messages they have.
from google.appengine.ext.ndb import *
class User(Model):
unread_messages = IntegerProperty()
class Message(Model):
read = BooleanProperty()
user_id = IntegerProperty()
My goal is for messages
to contain the correct value after a user has read some messages. When creating messages, it's easy to use a transaction to increase the unread_messages property by one and move on. But reading messages seems to be more difficult.
The problem is that the delta comes from a query, which could return the same results twice before a write is complete.
#User reads messages
query = Message.query()
query = query.filter(Message.read = False)
query = query.filter(Message.user_id = user.key.id())
unread_messages = query.fetch(10)
for message in unread_messages:
message.read = True
put_multi(unread_messages)
txn(user.key.id(), -len(unread_messages))
@run_in_transaction
def txn(id, delta):
user = Key(User, id).get()
user.unread_messages += delta
user.put()
As far as I can tell, there's no way to execute code after a write is guaranteed to be visible in a query. So for this method, I just set a delay of a few seconds on the task. This works most of the time, but it's easy to see how a write that takes longer than my delay would result in an incorrect value.
query = Message.query()
query = query.filter(Message.read = False)
query = query.filter(Message.user_id = user.key.id())
unread_messages = query.fetch(10)
for message in unread_messages:
message.read = True
put_multi(unread_messages)
taskqueue.add(url = '/tasks/update-unread-messages', params = {'user_id': user.key.id()}, countdown = 10)
Associated task:
query = Message.query()
query = query.filter(Message.read = False)
query = query.filter(Message.user_id = user.key.id())
count = query.count()
user.unread_messages = count
user.put()
@run_in_transaction
def txn(id, delta):
user = Key(User, id).get()
user.unread_messages += delta
user.put()
Giving a user's messages the same ancestor so I can do ancestor queries. This would be a last resort because of the performance constraints of ancestor queries, and the fact that I don't want to have to replace every entity.
Using the transactional flag with the taskqueue. For my put_multi to be transactional though, I would need to use an ancestor query.
Restructuring the schema in various ways, but it always comes back to being able to run code once I know a put is complete.
Upvotes: 1
Views: 222
Reputation: 11706
It feels expensive to update all the messages which will be read and update a counter. Why not use an entity with the keys of the unread messages to read multi fast (by key) and update only a single user entity, which has an index and count for all the unread messages.
class User(ndb.Model):
unread_messages_count = ndb.IntegerProperty(default=0)
unread_messages_index = ndb.KeyProperty(repeated=True)
This unread message index will only work if the messages are read in the same order as they arrive. See comments below.
Upvotes: 1