Reputation: 19
I set up a periodic task using celery beat. The task runs and I can see the result in the console. I want to have a python script that recollects the results thrown by the tasks.
I could do it like this:
#client.py
from cfg_celery import app
task_id = '337fef7e-68a6-47b3-a16f-1015be50b0bc'
try:
x = app.AsyncResult(id)
print(x.get())
except:
print('some error')
Anyway, as you can see, for this test I had to copy the task_id
thrown at the celery beat console (so to say) and hardcode it in my script. Obviously this is not going to work in real production.
I hacked it setting the task_id
on the celery config file:
#cfg_celery.py
app = Celery('celery_config',
broker='redis://localhost:6379/0',
include=['taskos'],
backend = 'redis'
)
app.conf.beat_schedule = {
'something': {
'task': 'tasks.add',
'schedule': 10.0,
'args': (16, 54),
'options' : {'task_id':"my_custom_id"},
}
}
This way I can read it like this:
#client.py
from cfg_celery import app
task_id = 'my_custom_id'
try:
x = app.AsyncResult(id)
print(x.get())
except:
print('some error')
The problem with this approach is that I lose the previous results (previous to the call of client.py
).
Is there some way I can read a list of the task_id
's in the celery backend?
If I have more than one periodic tasks, can I get a list of task_id
's from each periodic task?
Can I use app.tasks.key()
to accomplish this, how?
pd: not english-speaking-native, plus new to celery, be nice if I used some terminology wrong.
Upvotes: 0
Views: 1119
Reputation: 19
OK. I am not sure if nobody answered this because is difficult or because my question is too dumb. Anyway, what I wanted to do is to get the results of my 'celery-beat' tasks from another python process. Being in the same process there was no problem I could access the task id and everything was easy from there on. But from other process I didn't find a way to retrieve a list of the finished tasks.
I tried python-RQ (it is nice) but when I saw that using RQ I couldn't do that either I came to understand that I had to manually make use of redis storage capabilities. So I got what I wanted, doing this:
. Use 'bind=True' to be able to instrospect from within the task function. . Once I have the result of the function, I write it in a list in redis (I made some trick to limit the sizeof this list) . Now I can from an independent process connect to the same redis server and retrieve the results stored in such list.
My files ended up being like this:
cfg_celery.py : here I define the way the tasks are going to be called.
#cfg_celery.py
from celery import Celery
appo = Celery('celery_config',
broker='redis://localhost:6379/0',
include=['taskos'],
backend = 'redis'
)
'''
urlea se decoro como periodic_task. no hay necesidad de darla de alta aqi.
pero como add necesita args, la doy de alta manualmente p pasarselos
'''
appo.conf.beat_schedule = {
'q_loco': {
'task': 'taskos.add',
'schedule': 10.0,
'args': (16, 54),
# 'options' : {'task_id':"lcura"},
}
}
taskos.py : these are the tasks.
#taskos.py
from cfg_celery import appo
from celery.decorators import periodic_task
from redis import Redis
from datetime import timedelta
import requests, time
rds = Redis()
@appo.task(bind=True)
def add(self,a, b):
#result of operation. very dummy.
result = a + b
#storing in redis
r= (self.request.id,time.time(),result)
rds.lpush('my_results',r)
# for this test i want to have at most 5 results stored in redis
long = rds.llen('my_results')
while long > 5:
x = rds.rpop('my_results')
print('popping out',x)
long = rds.llen('my_results')
time.sleep(1)
return a + b
@periodic_task(run_every=20)
def urlea(url='https://www.fullstackpython.com/'):
inicio = time.time()
R = dict()
try:
resp = requests.get(url)
R['vato'] = url+" = " + str(resp.status_code*10)
R['num palabras'] = len(resp.text.split())
except:
R['vato'] = None
R['num palabras'] = 0
print('u {} : {}'.format(url,time.time()-inicio))
time.sleep(0.8) # truco pq se vea mas claramente la dif.
return R
consumer.py : the independent process that can get the results.
#consumer.py
from redis import Redis
nombre_lista = 'my_results'
rds = Redis()
tamaño = rds.llen(nombre_lista)
ultimos_resultados = list()
for i in range(tamaño):
ultimos_resultados.append(rds.rpop(nombre_lista))
print(ultimos_resultados)
I am relatively new to programming and I hope that this answer can help noobs like me. If I got something wrong feel free to make the corrections as necessary.
Upvotes: 1