Venkat S. Rao
Venkat S. Rao

Reputation: 1110

app engine DeadlineExceededError for cron jobs and task queue for wikipedia crawler

I am trying to build a wikipedia link crawler on google app engine. I wanted to store an index in the datastore. But I run into the DeadlineExceededError for both cron jobs and task queue.

for the cron job I have this code:

def buildTree(self):

    start=time.time()
    self.log.info(" Start Time: %f" % start)
    nobranches=TreeNode.all()       

    for tree in nobranches:            
        if tree.branches==[]:
            self.addBranches(tree)
            time.sleep(1)
        if (time.time()-start) > 10 :                
            break
        self.log.info("Time Eclipsed: %f" % (time.time()-start))

    self.log.info(" End Time:%f" % time.clock())

I don't understand why the for loop doesn't break after 10 seconds. It does on the dev server. Something must be wrong with the time.time() on the server. Is there another function I can use?

for the task queue I have this code:
def addNewBranch(self, keyword, level=0):

    self.log.debug("Add Tree")        
    self.addBranches(keyword)

    t=TreeNode.gql("WHERE name=:1", keyword).get()
    branches=t.nodes

    if level < 3:
        for branch in branches:
            if branch.branches == []:
                taskqueue.add(url="/addTree/%s" % branch.name)
                self.log.debug("url:%s" % "/addTree/%s" % branch.name)

The logs show that they both run into the DeadlineExceededError. Shouldn't background processing have a longer that the 30 seconds for the page request. Is there a way around the exception?

Here is the code for addBranch()

def addBranches(self, keyword):

    tree=TreeNode.gql("WHERE name=:1", keyword).get()
    if tree is None:
        tree=TreeNode(name=keyword)


    self.log.debug("in addBranches arguments: tree %s", tree.name)     
    t=urllib2.quote(tree.name.encode('utf8'))
    s="http://en.wikipedia.org/w/api.php?action=query&titles=%s&prop=links&pllimit=500&format=xml" % t
    self.log.debug(s)
    try:        
        usock = urllib2.urlopen(s)       
    except :        

        self.log.error( "Could not retrieve doc: %s" % tree.name)
        usock=None

    if usock is not None:

        try:
            xmldoc=minidom.parse(usock)
        except Exception , error:
            self.log.error("Parse Error: %s" % error) 
            return None   
        usock.close()            
        try:
            pyNode= xmldoc.getElementsByTagName('pl')  
            self.log.debug("Nodes to be added: %d" % pyNode.length)
        except Exception, e:
            pyNode=None
            self.log.error("Getting Nodes Error: %s" % e)
            return None
        newNodes=[]    
        if pyNode is not None:
            for child in pyNode: 
                node=None             
                node= TreeNode.gql("WHERE name=:1", child.attributes["title"].value).get()

                if node is None:
                    newNodes.append(TreeNode(name=child.attributes["title"].value))           

                else:
                    tree.branches.append(node.key())  
            db.put(newNodes)
            for node in newNodes:
                tree.branches.append(node.key())
                self.log.debug("Node Added: %s" % node.name)                    
            tree.put()
            return tree.branches 

Upvotes: 2

Views: 1210

Answers (4)

TheJacobTaylor
TheJacobTaylor

Reputation: 4141

I have had great success with datetimes on GAE.

from datetime import datetime, timedelta
time_start = datetime.now()
time_taken = datetime.now() - time_start

time_taken will be a timedelta. You can compare it against another timedelta that has the duration you are interested in.

ten_seconds = timedelta(seconds=10)
if time_taken > ten_seconds:
    ....do something quick.

It sounds like you would be far better served using mapreduce or Task Queues. Both are great fun for dealing with huge numbers of records.

A cleaner pattern for the code you have is to fetch only some records.

nobranches=TreeNode.all().fetch(100)

This code will only pull 100 records. If you have a full 100, when you are done, you can throw another item on the queue to launch off more.

-- Based on comment about needing trees without branches --

I do not see your model up there, but if I were trying to create a list of all of the trees without branches and process them, I would: Fetch the keys only for trees in blocks of 100 or so. Then, I would fetch all of the branches that belong to those trees using an In query. Order by the tree key. Scan the list of branches, the first time you find a tree's key, pull the key tree from the list. When done, you will have a list of "branchless" tree keys. Schedule each one of them for processing.

A simpler version is to use MapReduce on the trees. For each tree, find one branch that matches its ID. If you cannot, flag the tree for follow up. By default, this function will pull batches of trees (I think 25) with 8 simultaneous workers. And, it manages the job queues internally so you don't have to worry about timing out.

Upvotes: 2

kevpie
kevpie

Reputation: 26108

When DeadlineExcededErrors happen you want the request to eventually succeed if called again. This may require that your crawling state is guaranteed to have made some progress that can be skipped the next time. (Not addressed here)

Parallelized calls can help tremendously.

  • Urlfetch
  • Datastore Put (mixed entities together using db.put)
  • Datastore Query (queries in parallel - asynctools)

Urlfetch:

  • When you make your urlfetch calls be sure to use the asynchronous mode to collapse your loop.

Datastore

  • Combine Entities being put into a single round trip call.

    # put newNodes+tree at the same time
    db.put(newNodes+tree)
    
  • Pull TreeNode.gql from inside loop up into parallel query tool like asynctools http://asynctools.googlecode.com

Asynctools Example

    if pyNode is not None:

        runner = AsyncMultiTask()
        for child in pyNode:
             title = child.attributes["title"].value
             query = db.GqlQuery("SELECT __key__ FROM TreeNode WHERE name = :1", title)
             runner.append(QueryTask(query, limit=1, client_state=title))

        # kick off the work
        runner.run()

        # peel out the results
        treeNodes = []
        for task in runner:
            task_result = task.get_result() # will raise any exception that occurred for the given query
            treeNodes.append(task_result)

        for node in treeNodes:
            if node is None:
                newNodes.append(TreeNode(name=child.attributes["title"].value))

            else:
                tree.branches.append(node.key())
        for node in newNodes:
            tree.branches.append(node.key())
            self.log.debug("Node Added: %s" % node.name)

        # put newNodes+tree at the same time
        db.put(newNodes+tree)
        return tree.branches

DISCLOSURE: I am associated with asynctools.

Upvotes: 1

Nick Johnson
Nick Johnson

Reputation: 101149

The problem here is that you're doing a query operation for every link in your document. Since wikipedia pages can contain a lot of links, this means a lot of queries - and hence, you run out of processing time. This approach is also going to consume your quota at a fantastic rate!

Instead, you should use the page name of the Wikipedia page as the key name of the entity. Then, you can collect up all the links from the document into a list, construct keys from them (which is an entirely local operation), and do a single batch db.get for all of them. Once you've updated and/or created them as appropriate, you can do a batch db.put to store them all to the datastore - reducing your total datastore operations from numlinks*2 to just 2!

Upvotes: 1

Amber
Amber

Reputation: 527213

There is not a way "around" the deadline exception aside from making your code execute within the proper timeframe.

Upvotes: 1

Related Questions