Majid
Majid

Reputation: 638

Celery broadcast task not working

I've tried to make a broadcast task but only one of my workers recieve it per each call. Would you please help me? (I'm using rabbitmq and node-celery)

default_exchange = Exchange('celery', type='direct')
celery.conf.update(
    CELERY_RESULT_BACKEND = "amqp",
    CELERY_RESULT_SERIALIZER='json',
    CELERY_QUEUES = (
        Queue('celery', default_exchange, routing_key='celery'),
        Broadcast('broadcast_tasks'),
    ),
    CELERY_ROUTES = (
        {'my_tasks.sample_broadcast_task': {
            'queue': 'broadcast_tasks',
            }},
        {'my_tasks.sample_normal_task': {
            'queue': 'celery',
            'exchange': 'celery',
            'exchange_type': 'direct',
            'routing_key': 'celery',
            }}
    ),
    )

I've also test following configurtion but not working.

celery.conf.update(
    CELERY_RESULT_BACKEND = "amqp",
    CELERY_RESULT_SERIALIZER='json',
    CELERY_QUEUES=(
        Queue('celery', Exchange('celery'), routing_key='celery'),
        Broadcast('broadcast'),
    ),
    )
@celery.task(ignore_result=True, queue='broadcast',
             options=dict(queue='broadcast'))
def sample_broadcast_task():
    print "test"

EDIT

after changing how to run worker by adding -Q broadcast, now i face to this error:

PreconditionFailed: Exchange.declare: (406) PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'broadcast' in vhost '/': received 'direct' but current is 'fanout'

Upvotes: 2

Views: 2325

Answers (3)

Aji
Aji

Reputation: 133

Try this

   from celery import Celery
   from kombu.common import Broadcast

   BROKER_URL = 'amqp://guest:guest@localhost:5672//'


   class CeleryConf:
   # List of modules to import when celery starts.
   CELERY_ACCEPT_CONTENT = ['json']
   CELERY_IMPORTS = ('main.tasks')
   CELERY_QUEUES = (Broadcast('q1'),)
   CELERY_ROUTES = {
    'tasks.sampletask': {'queue': 'q1'}
   }


   celeryapp = Celery('celeryapp', broker=BROKER_URL)
   celeryapp.config_from_object(CeleryConf())


   @celeryapp.task
   def sampletask(form):
      print form

To send the message, do

    d= sampletask.apply_async(['4c5b678350fc643'],serializer="json", queue='q1')

Upvotes: 0

Jonas T.
Jonas T.

Reputation: 172

After trying many many many things, i finally find a solution. This work for me. ( celery 3.1.24 (Cipater) and Python 2.7.12 )

WORKER - tasks.py :

from celery import Celery
import celery_config
from kombu.common import Broadcast, Queue, Exchange

app = Celery()
app.config_from_object(sysadmin_celery_config)

@app.task
def print_prout(x):
    print x
    return x

WORKER - celery_config.py :

# coding=utf-8
from kombu.common import Broadcast, Queue, Exchange


BROKER_URL = 'amqp://login:[email protected]//'
CELERY_RESULT_BACKEND = 'redis://:[email protected]'


CELERY_TIMEZONE = 'Europe/Paris'
CELERY_ENABLE_UTC = True

CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
CELERY_DISABLE_RATE_LIMITS = True
CELERY_ALWAYS_EAGER = False

CELERY_QUEUES = (Broadcast('broadcast_tasks'), )

worker lauched with :

celery -A celery_worker.tasks worker --loglevel=info --concurrency=1 -n worker_name_1

On the client (another docker container for me).

from celery import Celery
from celery_worker import tasks

result = tasks.print_prout.apply_async(['prout'], queue='broadcast_tasks')

print result.get()

The next step for me is how to retrieve and display results returned by all the workers. The "print result.get()" seems to return only the result of the last worker. It does not seem obvious ( Have Celery broadcast return results from all workers )

Upvotes: 2

Chao Liu
Chao Liu

Reputation: 21

according to your description:

I've tried to make a broadcast task but only one of my workers recieve it per each call

you may be using direct type exchange.

Upvotes: 1

Related Questions