Reputation: 135
I am trying to get the sender filter working e.g.
@celery.task
def run_timer(crawl_start_time):
return crawl_start_time
@task_success.connect
def run_timer_success_handler(sender, result, **kwargs):
print '##################################'
print 'in run_timer_success_handler'
The above works fine, but if I try to filter by sender, it never works:
@task_success.connect(sender='tasks.run_timer')
def run_timer_success_handler(sender, result, **kwargs):
print '##################################'
print 'in run_timer_success_handler'
I also tried: @task_success.connect(sender='run_timer') @task_success.connect(sender=run_timer) @task_success.connect(sender=globals()['run_timer'])
None of them work.
How do I effectively use the sender filter to ensure that by callback is called on for the run_timer task and not the others.
Upvotes: 3
Views: 2975
Reputation: 2227
http://docs.celeryproject.org/en/latest/userguide/signals.html#task-success ... Sender is the task object executed. (not the same as after_task_publish.sender) ... So, you should
@task_success.connect(sender=run_timer)
def ...
It works for me. Good Luck.
Upvotes: 4
Reputation: 94
It's better to filter sender inside function in this case now. Like:
@task_success.connect
def ...
if sender == '...':
...
Because current celery signals implementation has issue when task sender and worker are different python processes. Because it converts your sender into the identifier and uses it for filtering, but celery sends task by string name. Here is the problem code (celery.utils.dispatch.signals):
def _make_id(target): # pragma: no cover
if hasattr(target, 'im_func'):
return (id(target.im_self), id(target.im_func))
return id(target)
And id('tasks.run_timer') is not the same as id('tasks.run_timer') of a worker process. If you want you may hack it and relace id by hash function
Upvotes: 4