Reputation: 949
To my knowledge, Celery acts as both the producer and consumer of messages. This is not what I want to achieve. I want Celery to act as the consumer only, to fire certain tasks based on messages that I send to my AMQP broker of choice. Is this possible?
Or do I need to make soup by adding carrot to my stack?
Upvotes: 7
Views: 11490
Reputation: 241
Celery Custom Consumer will be a feature released in 3.1v and now is under development, you can read https://docs.celeryq.dev/en/stable/userguide/extending.html about it.
Upvotes: 1
Reputation: 669
In order to consume message from celery you need to create message thats celery can consume. You can create celery message as follows:-
def get_celery_worker_message(task_name,args,kwargs,routing_key,id,exchange=None,exchange_type=None):
message=(args, kwargs, None)
application_headers={
'lang': 'py',
'task': task_name,
'id':id,
'argsrepr': repr(args),
'kwargsrepr': repr(kwargs)
#, 'origin': '@'.join([os.getpid(), socket.gethostname()])
}
properties={
'correlation_id':id,
'content_type': 'application/json',
'content_encoding': 'utf-8',
}
body, content_type, content_encoding = prepare(
message, 'json', 'application/json', 'utf-8',None, application_headers)
prep_message = prepare_message(body,None,content_type,content_encoding,application_headers,properties)
inplace_augment_message(prep_message, exchange, exchange_type, routing_key,id)
# dump_json = json.dumps(prep_message)
# print(f"json encoder:- {dump_json}")
return prep_message
You need to prep the message by first defining serializer, content_type, content_encoding, compression, headers based on the consumer.
def prepare( body, serializer=None, content_type=None,
content_encoding=None, compression=None, headers=None):
# No content_type? Then we're serializing the data internally.
if not content_type:
serializer = serializer
(content_type, content_encoding,
body) = dumps(body, serializer=serializer)
else:
# If the programmer doesn't want us to serialize,
# make sure content_encoding is set.
if isinstance(body, str):
if not content_encoding:
content_encoding = 'utf-8'
body = body.encode(content_encoding)
# If they passed in a string, we can't know anything
# about it. So assume it's binary data.
elif not content_encoding:
content_encoding = 'binary'
if compression:
body, headers['compression'] = compress(body, compression)
return body, content_type, content_encoding
def prepare_message( body, priority=None, content_type=None,
content_encoding=None, headers=None, properties=None):
"""Prepare message data."""
properties = properties or {}
properties.setdefault('delivery_info', {})
properties.setdefault('priority', priority )
return {'body': body,
'content-encoding': content_encoding,
'content-type': content_type,
'headers': headers or {},
'properties': properties or {}}
Once the message is created you need to add arguments to make it readable by celery consumer.
def inplace_augment_message(message, exchange,exchange_type, routing_key,next_delivery_tag):
body_encoding_64 = 'base64'
message['body'], body_encoding = encode_body(
str(json.dumps(message['body'])), body_encoding_64
)
props = message['properties']
props.update(
body_encoding=body_encoding,
delivery_tag=next_delivery_tag,
)
if exchange and exchange_type:
props['delivery_info'].update(
exchange=exchange,
exchange_type=exchange_type,
routing_key=routing_key,
)
elif exchange:
props['delivery_info'].update(
exchange=exchange,
routing_key=routing_key,
)
else:
props['delivery_info'].update(
exchange=None,
routing_key=routing_key,
)
class Base64:
"""Base64 codec."""
def encode(self, s):
return bytes_to_str(base64.b64encode(str_to_bytes(s)))
def decode(self, s):
return base64.b64decode(str_to_bytes(s))
def encode_body( body, encoding=None):
codecs = {'base64': Base64()}
if encoding:
return codecs.get(encoding).encode(body), encoding
return body, encoding
Upvotes: 0
Reputation: 5048
Celery brokers acts as a message stores and publish them to one or more workers that subscribe for those,
so: celery pulishes messages to a broker (rabbitmq, redist, celery itself through django db, etc..) those messages are retrieved by a worker following the protocol of the broker, that memorizes them (usually they are persistent but maybe it dependes on your broker), and got executed by you workers.
Task results are available on the executing worker task's, and you can configure where to store those results and you can retrieve them with this method .
You can publish tasks with celery passing parameters to your "receiver function" (the task you define, the documentation has some examples, usually you do not want to pass big things here (say a queryset), but only the minimal information that permits you to retrieve what you need when executing the task.
one easy example could be:
You register a task
@task
def add(x,x):
return x+y
and you call the from another module with:
from mytasks import add
metadata1 = 1
metadata2 = 2
myasyncresult = add.delay(1,2)
myasyncresult.get() == 3
EDIT
after your edit I saw that probably you want to construct messages from other sources other that celery, you could see here the message format, they default as pickled objects that respect that format, so you post those message in the right queue of your rabbitmq broker and you are right to go retrieving them from your workers.
Upvotes: 6
Reputation: 1666
Celery uses the message broker architectural pattern. A number of implementations / broker transports can be used with Celery including RabbitMQ and a Django database.
From Wikipedia:
A message broker is an architectural pattern for message validation, message transformation and message routing. It mediates communication amongst applications, minimizing the mutual awareness that applications should have of each other in order to be able to exchange messages, effectively implementing decoupling.
Keeping results is optional and requires a result backend. You can use different broker and result backends. The Celery Getting Started guide contains further information.
The answer to your question is yes you can fire specific tasks passing arguments without addding Carrot to the mix.
Upvotes: 1