Ohad Perry
Ohad Perry

Reputation: 1425

app engine can't add tasks to push queues in localhost

Want to test simple task add to queue I have this script.

import os, sys

test_directory = os.path.dirname(os.path.abspath(__file__))
paths = [
    '/../../google_appengine',
]

for path in paths:
    sys.path.insert(0, os.path.abspath(test_directory + path))

from google.appengine.api import taskqueue


QUEUE_NAME = 'default'

import os
import unittest

from google.appengine.api import taskqueue
from google.appengine.ext import testbed


class TaskQueueTestCase(unittest.TestCase):
    def setUp(self):
        self.testbed = testbed.Testbed()
        self.testbed.activate()

        # root_path must be set the the location of queue.yaml.
        # Otherwise, only the 'default' queue will be available.
        self.testbed.init_taskqueue_stub(
            root_path=os.path.join(os.path.dirname(__file__), 'resources'))
        self.taskqueue_stub = self.testbed.get_stub(
            testbed.TASKQUEUE_SERVICE_NAME)

    def tearDown(self):
        self.testbed.deactivate()

    def testTaskAddedToQueue(self):
        taskqueue.Task(name='my_task', url='/url/of/my/task/').add(QUEUE_NAME)
        tasks = self.taskqueue_stub.get_filtered_tasks(QUEUE_NAME)
        assert len(tasks) == 1
        assert tasks[0].name == 'my_task'


aaa = TaskQueueTestCase('setUp')
aaa()
aaa.testTaskAddedToQueue()

if trying to run the

 taskqueue.Task(name='my_task', url='/url/of/my/task/').add(QUEUE_NAME)

getting TaskAlreadyExistsError

but when trying to count the number of tasks getting 0

self.taskqueue_stub.get_filtered_tasks(QUEUE_NAME)  # getting [] here

also tried

q = taskqueue.Queue('slowQueue')
task = taskqueue.Task(url='/worker/slow', params={'name': 'myname'})
q.add(task)

getting UnknownQueueError

I've ran the localserver using

 dev_appserver.py -A asdfasdf  worker.yaml
INFO     2016-10-05 11:00:47,730 devappserver2.py:769] Skipping SDK update check.
INFO     2016-10-05 11:00:47,819 api_server.py:205] Starting API server at: http://localhost:62568
INFO     2016-10-05 11:00:47,822 dispatcher.py:197] Starting module "worker" running at: http://localhost:8080
INFO     2016-10-05 11:00:47,825 admin_server.py:116] Starting admin server at: http://localhost:8000
WARNING  2016-10-05 11:00:47,825 devappserver2.py:835] No default module found. Ignoring.

local server for push queues

here is the worker.yaml

queue:
- name: ohad
  bucket_size: 1
  rate: 1/s

how do I define in my code to push messages to localhost server ?

pretty new to app and engine and push queues. please help.

Upvotes: 0

Views: 674

Answers (1)

Dan Cornilescu
Dan Cornilescu

Reputation: 39834

From Naming a task (emphasis mine):

A task name must be unique within a queue. If you try to add another task with the same name to a the queue, the operation will fail. After a task is removed from a queue you can't insert a task with the same name into the queue until 10 days have passed. A task name can contain uppercase and lowercase letters, numbers, underscores, and hyphens. The maximum length for a task name is 500 characters.

You have 2 options:

  • ensure the task names you assign are unique - for example (assuming the timestamps-based names are indeed unique):

    def testTaskAddedToQueue(self):
        import time
        task_name = 'my_task_%d' % int(time.time() * 100000)
        taskqueue.Task(name=task_name, url='/url/of/my/task/').add(QUEUE_NAME)
        tasks = self.taskqueue_stub.get_filtered_tasks(QUEUE_NAME)
        assert len(tasks) == 1
        assert tasks[0].name == task_name
    
  • leave GAE auto-assign the task name (guaranteed unique - based on the number of already enqueued tasks) and get the name from the created task (instead of asserting the name matches the one assigned). For example:

    def testTaskAddedToQueue(self):
        task = taskqueue.Task(url='/url/of/my/task/')
        task.add(QUEUE_NAME)            
        tasks = self.taskqueue_stub.get_filtered_tasks(QUEUE_NAME)
        assert len(tasks) == 1
        assert tasks[0].name == task.name
    

The naming conflict prevents the task from being added to the queue, which is why len(tasks) is 0 and .get_filtered_tasks() returns [].

Upvotes: 2

Related Questions