jg mtz
jg mtz

Reputation: 19

how to get celery tasks id

I set up a periodic task using celery beat. The task runs and I can see the result in the console. I want to have a python script that recollects the results thrown by the tasks.

I could do it like this:

#client.py
from cfg_celery import app
task_id = '337fef7e-68a6-47b3-a16f-1015be50b0bc'
try:
    x = app.AsyncResult(id)
    print(x.get())
except:
    print('some error')

Anyway, as you can see, for this test I had to copy the task_id thrown at the celery beat console (so to say) and hardcode it in my script. Obviously this is not going to work in real production.

I hacked it setting the task_id on the celery config file:

#cfg_celery.py
app = Celery('celery_config',
        broker='redis://localhost:6379/0',
        include=['taskos'],
        backend = 'redis'
        )
app.conf.beat_schedule = {
    'something': {
        'task': 'tasks.add',
        'schedule': 10.0,
        'args': (16, 54),
        'options' : {'task_id':"my_custom_id"},
    }
}

This way I can read it like this:

#client.py
from cfg_celery import app
task_id = 'my_custom_id'
try:
    x = app.AsyncResult(id)
    print(x.get())
except:
    print('some error')

The problem with this approach is that I lose the previous results (previous to the call of client.py).

Is there some way I can read a list of the task_id's in the celery backend? If I have more than one periodic tasks, can I get a list of task_id's from each periodic task? Can I use app.tasks.key() to accomplish this, how?

pd: not english-speaking-native, plus new to celery, be nice if I used some terminology wrong.

Upvotes: 0

Views: 1119

Answers (1)

jg mtz
jg mtz

Reputation: 19

OK. I am not sure if nobody answered this because is difficult or because my question is too dumb. Anyway, what I wanted to do is to get the results of my 'celery-beat' tasks from another python process. Being in the same process there was no problem I could access the task id and everything was easy from there on. But from other process I didn't find a way to retrieve a list of the finished tasks.

I tried python-RQ (it is nice) but when I saw that using RQ I couldn't do that either I came to understand that I had to manually make use of redis storage capabilities. So I got what I wanted, doing this:

. Use 'bind=True' to be able to instrospect from within the task function. . Once I have the result of the function, I write it in a list in redis (I made some trick to limit the sizeof this list) . Now I can from an independent process connect to the same redis server and retrieve the results stored in such list.

My files ended up being like this:

cfg_celery.py : here I define the way the tasks are going to be called.

#cfg_celery.py
from celery import Celery

appo = Celery('celery_config',
        broker='redis://localhost:6379/0',
        include=['taskos'],
        backend = 'redis'
        )

'''
urlea se decoro como periodic_task. no hay necesidad de darla de alta aqi.
pero como add necesita args, la doy de alta manualmente p pasarselos
'''
appo.conf.beat_schedule = {
    'q_loco': {
        'task': 'taskos.add',
        'schedule': 10.0,
        'args': (16, 54),
        # 'options' : {'task_id':"lcura"},
    }
}

taskos.py : these are the tasks.

#taskos.py
from cfg_celery import appo
from celery.decorators import periodic_task
from redis import Redis

from datetime import timedelta
import requests, time

rds = Redis()

@appo.task(bind=True)
def add(self,a, b):
    #result of operation. very dummy.
    result = a + b

    #storing in redis
    r= (self.request.id,time.time(),result)
    rds.lpush('my_results',r)

    # for this test i want to have at most 5 results stored in redis
    long = rds.llen('my_results')
    while long > 5:
        x = rds.rpop('my_results')
        print('popping out',x)
        long = rds.llen('my_results')
        time.sleep(1)
    return a + b


@periodic_task(run_every=20)
def urlea(url='https://www.fullstackpython.com/'):
    inicio = time.time()
    R = dict()
    try:
        resp = requests.get(url)
        R['vato'] = url+" = " + str(resp.status_code*10)
        R['num palabras'] = len(resp.text.split())
    except:
        R['vato'] = None
        R['num palabras'] = 0        
    print('u {} : {}'.format(url,time.time()-inicio))
    time.sleep(0.8) # truco pq se vea mas claramente la dif.
    return R

consumer.py : the independent process that can get the results.

#consumer.py
from redis import Redis
nombre_lista = 'my_results'

rds = Redis()

tamaño = rds.llen(nombre_lista)
ultimos_resultados = list()
for i in range(tamaño):
    ultimos_resultados.append(rds.rpop(nombre_lista))

print(ultimos_resultados)

I am relatively new to programming and I hope that this answer can help noobs like me. If I got something wrong feel free to make the corrections as necessary.

Upvotes: 1

Related Questions