Reputation: 363
How can I get the results of a task from redis? I want the query results that the task returned and not the status of the tasks.
Log files verify that the task returns the results. According to these docs get()
can be used to return the task results and according to these docs it should work. Something tells me I am not actually saving the results to the redis backend.
Expected Behavior: Run the task every 24 hours and store the db query results in redis. Use the redis cache to get those results on application calls.
Here is my task function.
@shared_task(name='get_top_ten_gainers', ignore_result=False)
def get_top_ten_gainers():
from collections import namedtuple
query = (
db_session.execute(
"""WITH p AS (
SELECT CompanyId,
100 * (MAX(CASE WHEN rn = 1 THEN CloseAdjusted END) / MAX(CASE WHEN rn = 2 THEN CloseAdjusted END) - 1) DayGain
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY CompanyId ORDER BY Date DESC) rn
FROM DimCompanyPrice
)
WHERE rn <= 2
GROUP BY CompanyId
ORDER BY DayGain DESC
LIMIT 10
)
SELECT *
FROM p
JOIN Company ON p.CompanyID = Company.ID"""
)
)
logger.info(query)
Gain = namedtuple('Gain', query.keys())
gains = [Gain(*q) for q in query.fetchall()]
payload = [[g.Symbol, g.Security, g.DayGain] for g in gains]
logger.info("`payload` of type {}: {}".format(type(payload),payload))
return payload
Here is my flask view helper function:
def _top_ten_gainers():
t0 = time.perf_counter()
from project.tasks import get_top_ten_gainers
result = get_top_ten_gainers.apply_async()
logger.info("`result` of type {}: {}".format(type(result),result))
logger.info("`result.get()` of type {}: {}".format(type(result.get()),result.get()))
total_payload['gainers'] = result.get()
logger.info('task finished in {}'.format(time.perf_counter() - t0))
Here is the celery ouput:
-------------- celery@desktop v5.1.2 (sun-harmonics)
--- ***** -----
-- ******* ---- Linux-5.11.0-41-generic-x86_64-with-glibc2.29 2021-12-02 18:07:16
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: default:0x7fb6f8bb5b80 (.default.Loader)
- ** ---------- .> transport: redis://127.0.0.1:6379/0
- ** ---------- .> results: redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[2021-12-02 18:07:17,276: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2021-12-02 18:07:17,287: INFO/MainProcess] mingle: searching for neighbors
[2021-12-02 18:07:18,305: INFO/MainProcess] mingle: all alone
[2021-12-02 18:07:18,320: INFO/MainProcess] celery@desktop ready.
[2021-12-09 15:08:01,235: INFO/MainProcess] Task get_top_ten_gainers[26ce0da8-662c-4690-b95e-9d1e42e79e34] received
[2021-12-09 15:08:05,068: INFO/ForkPoolWorker-15] <sqlalchemy.engine.cursor.CursorResult object at 0x7f40c876e970>
[2021-12-09 15:08:05,087: INFO/ForkPoolWorker-15] `payload` of type <class 'list'>: [['MA', 'Mastercard', 3.856085884230387], ['GM', 'General Motors', 3.194377894904976], ['LH', 'LabCorp', 2.6360513469535274], ['CCI', 'Crown Castle', 2.491537650518838], ['DHI', 'D. R. Horton', 2.451821208757954], ['PEAK', 'Healthpeak Properties', 2.3698069046225845], ['BDX', 'Becton Dickinson', 2.355495473352187], ['DD', 'DuPont', 2.19100399536023], ['HLT', 'Hilton Worldwide', 1.9683928319458088], ['JKHY', 'Jack Henry & Associates', 1.7102615694164935]]
[2021-12-09 15:08:05,090: INFO/ForkPoolWorker-15] Task get_top_ten_gainers[26ce0da8-662c-4690-b95e-9d1e42e79e34] succeeded in 3.8531467270004214s: None
Here is my log file output:
[2021-12-09 15:08:01,216][index ][INFO ] `result` of type <class 'celery.result.AsyncResult'>: 26ce0da8-662c-4690-b95e-9d1e42e79e34
[2021-12-09 15:08:05,068][tasks ][INFO ] <sqlalchemy.engine.cursor.CursorResult object at 0x7f40c876e970>
[2021-12-09 15:08:05,087][tasks ][INFO ] `payload` of type <class 'list'>: [['MA', 'Mastercard', 3.856085884230387], ['GM', 'General Motors', 3.194377894904976], ['LH', 'LabCorp', 2.6360513469535274], ['CCI', 'Crown Castle', 2.491537650518838], ['DHI', 'D. R. Horton', 2.451821208757954], ['PEAK', 'Healthpeak Properties', 2.3698069046225845], ['BDX', 'Becton Dickinson', 2.355495473352187], ['DD', 'DuPont', 2.19100399536023], ['HLT', 'Hilton Worldwide', 1.9683928319458088], ['JKHY', 'Jack Henry & Associates', 1.7102615694164935]]
[2021-12-09 15:08:05,090][index ][INFO ] `result.get()` of type <class 'NoneType'>: None
Upvotes: 0
Views: 837
Reputation: 11337
Seems like your task is taking ~4 seconds and you're not waiting for it to finish. You can wait for it like that:
def _top_ten_gainers():
from project.tasks import get_top_ten_gainers
result = get_top_ten_gainers.delay()
while not result.ready():
sleep(1)
gains = result.get()
logger.info("Task results: {}".format(gains))
Upvotes: 0