Reputation: 11
Hi, Need advice on how to use cursor functions in NDB model.I am able to use with_cursor in DB model but its not available in NDB model:
def myDBfn(self):
cursor_from_url = self.request.get('cursor')
logging.info("Cursor in start is: {}".format(cursor_from_url))
process_date = util_misc.get_start_date_from_day_param(self)
gql_query_text = 'Select * From DummyTable'
#Execute query
query_o = db.GqlQuery(gql_query_text, process_date)
if cursor_from_url:
query_o.with_cursor(cursor_from_url)
BATCH_SIZE = 500
results = query_o.fetch(BATCH_SIZE)
else:
BATCH_SIZE = 1
results = query_o.fetch(BATCH_SIZE)
if not results:
logging.info("pref_email_update_task: all done")
return
logging.info("Exiting myDBfn....")
return
def myNDBfn(self):
cursor_from_url = Cursor(urlsafe=self.request.get('cursor'))
logging.info("Cursor in start is: {}".format(cursor_from_url))
gql_query_text = 'Select * From TestSequences' #Define query conditions here
#Execute query
query_o = ndb.gql(gql_query_text)
if cursor_from_url:
query_o.fetch_page(5,start_cursor=cursor_from_url)
BATCH_SIZE = 500
results = query_o.fetch(BATCH_SIZE)
else:
BATCH_SIZE = 1
results = query_o.fetch(BATCH_SIZE)
if not results:
logging.info("feed_sequence_update_second_task: all done")
return
logging.info("result is {}".format(results))
logging.info("Exiting myNDBfn....")
return
But in NDB model this cursor_from_url is giving an error:raise datastore_errors.BadValueError('invalid cursor') Kindly advice!!!!!
Upvotes: 0
Views: 220
Reputation: 11
import webapp2 import logging from google.appengine.api import taskqueue from google.appengine.ext import ndb
from models.feed_models import TestSequences, FeedsSequences from controllers.system import template, error as err, security as sec from collections import defaultdict from google.appengine.ext.ndb import Cursor
class FeedModelCheck(webapp2.RequestHandler):
get_methods = {
'/admin/feed/test_create_dummy_TestSequences_records': 'test_create_dummy_TestSequences_records',
'/admin/feed/add-top-searches-daily' : 'feed_sequence_update_daily',
'/admin/feed/add-top-searches-alltime' : 'feed_sequence_update_alltime',
}
post_methods = {
'/admin/feed/add-top-searches-alltime' : 'feed_sequence_update_alltime',
'/admin/feed/add-top-searches-daily' : 'feed_sequence_update_daily'
}
# Methods Definitions
def __init__(self, request='', response=''):
super(FeedModelCheck, self).__init__(request, response)
self.request = request
self.response = response
self.GETSTR = 'get'
self.POSTSTR = 'post'
return
def get(self):
logging.info('In get method')
self.process_url(self.GETSTR)
return
def post(self):
logging.info('In Post method')
self.process_url(self.POSTSTR)
return
def process_url(self, source):
#validate URL
rcvd_url = self.request.path
logging.info('Rcvd url = %s', rcvd_url)
if source == self.GETSTR:
func_map = self.get_methods
else:
func_map = self.post_methods
secObj = sec.SecurityHandler()
if secObj.validate_url(rcvd_url):
#get destination method using key-value pair
dest_method = func_map.get(rcvd_url, None) # Correction 2
logging.info("dest_method is {}".format(dest_method))
if dest_method:
func = getattr(self, dest_method, None)
logging.info("func is {}".format(func))
if func:
func()
logging.info('Finished %s method', source)
return
#Invalid url
else:
pass #case handled below
#Invalid URL
err_obj = err.ErrorHandler()
err_obj.handle_invalid_url(rcvd_url)
self.error(404)
logging.info('Finished %s method with error', source)
return
def test_create_dummy_TestSequences_records(self):
#In this function, create 10-15 dummy records in TestSequences database for testing purposes
logging.info("In test_create_dummy_TestSequences_records")
## LOGIC GOES HERE ##
key_seq_list = ["seqid_1","seqid_2","seqid_3", "seqid_4"]
user_email_list = ["[email protected]","[email protected]","[email protected]","[email protected]"]
user_type_list = ["teacher","intraining","therapist","teacher"]
seq_poses_list = [["pose1","pose2","pose3","pose4"],["pose5","pose6","pose7","pose8"],["pose9","pose10","pose11","pose12"],["pose13","pose14","pose15","pose16"]]
seq_poses_custom_list = ["pose_c1","pose_c2","pose_c3","pose_c4"]
seq_poses_num_list = [1,2,3,4]
seq_poses_breath_count_list = ["100","200","300","400"]
seq_poses_breath_type_list = ["normal","high","low","normal"]
seq_poses_duration_list = ["10 seconds","20 seconds","30 seconds","10 seconds"]
seq_poses_renames_list = ["1","2","3","4"]
seq_cues_list = ["cue1","cue2","cue3","cue4"]
seq_title_list = ["seq_1","seq_2","seq_3","seq_4"]
seq_desc_list = ["demo1","demo2","demo3","demo4"]
seq_tags_list = [["tag1"],["tag2"],["tag3"],["tag4"]]
seq_level_list = ["level_1","level_2","level_3","level_4"]
seq_theme_list = ["theme1","theme2","theme3","theme4"]
seq_focus_list = ["focus1","focus2","focus3","focus4"]
seq_props_list = [["props1"],["props2"],["props3"],["props4"]]
seq_props_other_list = ["props_others1","props_others2","props_others3","props_others4"]
seq_access_list = ["public","private","public","private"]
seq_duration_list = [150,250,360,450]
seq_playlist_list = ["s10","s20","s30","s40"]
seq_local_id_list = ["L1","L2","L1","L2"]
seq_source_seq_id_list = ["0001","0002","0003","0004"]
seq_source_seq_type_list = ["self","teacher","tummee","merged"]
seq_yoga_type_list = ["yoga_type_a","yoga_type_a","yoga_type_a","yoga_type_b"]
deleted_list = [True,False,True,False]
i = 0
for key_item in key_seq_list:
#Use get_or_inser to fetch function to check data presence in TestSequences Model.
test_sequence = TestSequences.get_or_insert(key_item, user_email= user_email_list[i],user_type = user_type_list[i],seq_poses=seq_poses_list[i],seq_poses_custom = seq_poses_custom_list[i],seq_poses_num = seq_poses_num_list[i],seq_poses_breath_count = seq_poses_breath_count_list[i],seq_poses_breath_type = seq_poses_breath_type_list[i],seq_poses_duration = seq_poses_duration_list[i],seq_poses_renames = seq_poses_renames_list[i],seq_cues = seq_cues_list[i],seq_title = seq_title_list[i],seq_desc = seq_desc_list[i],seq_tags = seq_tags_list[i],seq_level = seq_level_list[i],seq_theme = seq_theme_list[i],seq_focus = seq_focus_list[i],seq_props = seq_props_list[i],seq_props_other = seq_props_other_list[i],seq_access = seq_access_list[i],seq_duration = seq_duration_list[i],seq_playlist = seq_playlist_list[i],seq_local_id = seq_local_id_list[i],seq_source_seq_id = seq_source_seq_id_list[i],seq_source_seq_type = seq_source_seq_type_list[i],seq_yoga_type = seq_yoga_type_list[i],deleted = deleted_list[i])
#Use put() in a for loop to insert data in TestSequences Model.
test_sequence.put()
if (i < (len(key_seq_list)-1)):
i+=1
logging.info("Exiting test_create_dummy_TestSequences_records")
return
def feed_sequence_update_alltime(self):
logging.info("In feed_sequence_update_alltime")
urlsafe_cursor = self.request.get('cursor')
cursor_from_url = Cursor(urlsafe=urlsafe_cursor) if urlsafe_cursor else None
logging.info("Cursor in start is: {}".format(cursor_from_url))
#Define query here
gql_query_text = 'Select * From TestSequences'
#Execute query
query_o = ndb.gql(gql_query_text)
BATCH_SIZE = 500
results, next_cursor, more = query_o.fetch_page(BATCH_SIZE, start_cursor=cursor_from_url)
if more:
pass
if next_cursor:
pass
if not results:
logging.info("feed_sequence_update_task: all done")
return
#Calling fn to insert data in FeedSequence
record_type = 'alltime'
FeedModelCheck.add_feeds_sequences_records(self,results,record_type)
cursor = query_o.fetch()
taskqueue.add(url='/admin/feed/add-top-searches-alltime', params={'cursor': cursor})
logging.info("Exiting feed_sequence_update_alltime")
return
def feed_sequence_update_daily(self):
logging.info("In feed_sequence_update_daily")
urlsafe_cursor = self.request.get('cursor')
cursor_from_url = Cursor(urlsafe=urlsafe_cursor) if urlsafe_cursor else None
logging.info("cursor_from_url in start is: {}".format(cursor_from_url))
logging.info("urlsafe_cursor in start is: {}".format(urlsafe_cursor))
#Define query here
gql_query_text = 'Select * From TestSequences'
#Execute query
query_o = ndb.gql(gql_query_text)
BATCH_SIZE = 500
results, next_cursor, more = query_o.fetch_page(BATCH_SIZE, start_cursor=cursor_from_url)
if more:
pass
if next_cursor:
pass
if not results:
logging.info("feed_sequence_update_second_task: all done")
return
logging.info(" results is {}".format(results))
#Calling fn to insert data in FeedSequence
record_type = 'daily'
FeedModelCheck.add_feeds_sequences_records(self,results,record_type)
cursor = query_o.fetch()
taskqueue.add(url='/admin/feed/add-top-searches-daily', params={'cursor': cursor})
logging.info("Exiting feed_sequence_update_daily")
return
def add_feeds_sequences_records(self,results,rec_type):
logging.info("Entering add_feeds_sequences_records...")
## LOGIC TO RUN FOR LOOP AND PUT DATA IN FeedsSequences database ##
seq_yoga_list = []
seq_id_list = []
for record in results:
seq_yoga_list = seq_yoga_list + list(str(record.seq_yoga_type).split())
seq_id_list = seq_id_list + list(str(record.key.id()).split())
#yoga_type and corresponding seq_id are associated with each other as key, value pair in a dictionary
dict_result = defaultdict(list)
for k, v in zip(seq_yoga_list, seq_id_list):
dict_result[k].append(v)
for record in results:
sequence_key = record.seq_yoga_type
record_type = rec_type
feed_seq = FeedsSequences.get_or_insert(sequence_key,record_type=record_type,sequence_list=dict_result[record.seq_yoga_type])
feed_seq.put()
logging.info("Exiting for add_feeds_sequences_records...")
return
app = template.webapp2.WSGIApplication([ ('/admin/feed/test_create_dummy_TestSequences_records', FeedModelCheck), ('/admin/feed/add-top-searches-alltime', FeedModelCheck), ('/admin/feed/add-top-searches-daily', FeedModelCheck), ], debug=True)
Upvotes: 0
Reputation: 11
This is the final code i am using.Please have a look if i am using the taskqueue correctly?
def myNDBfn(self):
logging.info("Entering myNDBfn...")
urlsafe_cursor = self.request.get('cursor', None)
cursor_from_url = Cursor(urlsafe=urlsafe_cursor) if urlsafe_cursor else None
logging.info("Cursor in start is: {}".format(cursor_from_url))
gql_query_text = 'Select * From TestSequences' #Define query conditions here
#Execute query
query_o = ndb.gql(gql_query_text)
BATCH_SIZE = 500
results, next_cursor, more = query_o.fetch_page(BATCH_SIZE, start_cursor=cursor_from_url)
if more:
pass
if next_cursor:
pass
if not results:
logging.info("feed_sequence_update_second_task: all done")
return
logging.info(" results is {}".format(results))
#Calling fn to insert data in FeedSequence
record_type = 'daily'
FeedModelCheck.add_feeds_sequences_records(self,results,record_type)
cursor = query_o.fetch()
taskqueue.add(url='/admin/feed/add-top-searches-daily', params={'cursor': cursor})
logging.info("Exiting myNDBfn...")
return
Upvotes: 0
Reputation: 5276
EDIT - I rewrote my answer, because i misread your code.
You were constructing the Cursor correctly in the first place. The issue is that you were ignoring the results of fetch_page()
I rewrote your function
def myNDBfn(self):
urlsafe_cursor = self.request.get('cursor', None)
cursor_from_url = Cursor(urlsafe=urlsafe_cursor) if urlsafe_cursor else None
logging.info("Cursor in start is: {}".format(cursor_from_url))
gql_query_text = 'Select * From TestSequences' #Define query conditions here
#Execute query
results, next_cursor, more = ndb.gql(gql_query_text).fetch_page(500, start_cursor=cursor_from_url)
if not results:
logging.info("feed_sequence_update_second_task: all done")
return
logging.info("result is {}".format(results))
logging.info("next cursor is {}. urlsafe: {}".format(next_cursor, next_cursor.urlsafe()))
logging.info("Exiting myNDBfn....")
return
Upvotes: 0
Reputation: 11
I found a fix but not sure if it is the right way of using cursor in NDB Model: def feed_sequence_update_alltime(self):
logging.info("In feed_sequence_update_alltime")
cursor_from_url = ndb.Cursor.from_websafe_string(self.request.get('cursor'))
logging.info("Cursor in start is: {}".format(cursor_from_url))
gql_query_text = 'Select * From TestSequences' #Define query conditions here
#Execute query
query_o = ndb.gql(gql_query_text)
if cursor_from_url:
query_o.fetch_page(20)
BATCH_SIZE = 500
results = query_o.fetch(BATCH_SIZE)
else:
BATCH_SIZE = 1
results = query_o.fetch(BATCH_SIZE)
if not results:
logging.info("feed_sequence_update_task: all done")
return
logging.info("result is {}".format(results))
#Calling fn to insert data in FeedSequence
record_type = 'alltime'
FeedModelCheck.add_feeds_sequences_records(self,results,record_type)
cursor = query_o.fetch()
taskqueue.add(url='/admin/feed/add-top-searches-alltime', params={'cursor': cursor})
logging.info("Exiting feed_sequence_update_alltime")
return
Upvotes: 1