Reputation: 178
I have a large amount of entities (products) in my datastore which come from a external data source. I want to check them for updates daily.
Some items are already updated because the application fetched them directly. Some are newly inserted and don´t need updates.
For ones which have not been fetched I have cron jobs running. I use the Python API.
At the moment I do the following.
I have a field
dateupdated = db.DateTimeProperty(auto_now_add=True)
I can then use
query = dbmodel.product.all()
query.filter('dateupdated <', newdate)
query.order('dateupdated')
results = query.fetch(limit=mylimit, offset=myoffset)
to pick the oldest entries and schedule them for update. I used the Task Queue with custom task names to make sure each product update is only run once a day.
The problem is, that I need to update the field dateupdated, which means a datastore write, even if a product´s data was not changed, just to keep track of the update process.
This consumes lots of ressources (CPU hours, Datastore API calls, etc.).
Is there a better way to perform such a task and avoid the unnecessary datastore writes?
Upvotes: 1
Views: 364
Reputation: 14175
Yes, use cursors
By ordering a query by dateupdated
and then storing a cursor after you have processed your entities, you can re-run the same query later to get only the items updated after your last query.
So, given a class like
class MyEntity(db.model):
dateupdated = db.DateTimeProperty(auto_now_add=True)
You could setup a handler to be run as a task like:
class ProcessNewEntities(webapp.RequestHandler):
def get(self):
"""Run via a task to process batches of 'batch_size'
recently updated entities"""
# number of eneities to process per task execution
batch_size = 100
# build the basic query
q = MyEntity.all().order("dateupdated")
# use a cursor?
cursor = self.request.get("cursor")
if cursor:
q.with_cursor(cursor)
# fetch the batch
entities = q.fetch(batch_size)
for entity in entities:
# process the entity
do_your_processing(entity)
# queue up the next task to process the next 100
# if we have no more to process then delay this task
# for a while so that it doesn't hog the application
delay = 600 if len(entities)<batch_size else 0
taskqueue.add(
url='/tasks/process_new_entities',
params={'cursor': q.cursor()},
countdown=delay)
and then you just need to trigger the start of the task execution like:
def start_processing_entities():
taskqueue.add(url='/tasks/process_new_entities')
Upvotes: 1