sat1017
sat1017

Reputation: 363

Celery tasks not returning results from redis

How can I get the results of a task from redis? I want the query results that the task returned and not the status of the tasks.

Log files verify that the task returns the results. According to these docs get() can be used to return the task results and according to these docs it should work. Something tells me I am not actually saving the results to the redis backend.

Expected Behavior: Run the task every 24 hours and store the db query results in redis. Use the redis cache to get those results on application calls.

Here is my task function.

@shared_task(name='get_top_ten_gainers', ignore_result=False)
def get_top_ten_gainers():
    from collections import namedtuple
    query = (
        db_session.execute(
            """WITH p AS (
            SELECT CompanyId,
            100 * (MAX(CASE WHEN rn = 1 THEN CloseAdjusted END) / MAX(CASE WHEN rn = 2 THEN CloseAdjusted END) - 1) DayGain
            FROM (
            SELECT *, ROW_NUMBER() OVER (PARTITION BY CompanyId ORDER BY Date DESC) rn
            FROM DimCompanyPrice
            )
            WHERE rn <= 2
            GROUP BY CompanyId
            ORDER BY DayGain DESC
            LIMIT 10
            )
            SELECT *
            FROM p
            JOIN Company ON p.CompanyID = Company.ID"""
        )    
    )
    logger.info(query)
    
    Gain = namedtuple('Gain', query.keys())
    gains = [Gain(*q) for q in query.fetchall()]
    payload = [[g.Symbol, g.Security, g.DayGain] for g in gains]
    logger.info("`payload` of type {}: {}".format(type(payload),payload))

    return payload

Here is my flask view helper function:

def _top_ten_gainers():
    t0 = time.perf_counter()
    
    from project.tasks import get_top_ten_gainers
    
    result = get_top_ten_gainers.apply_async()
    logger.info("`result` of type {}: {}".format(type(result),result))
    logger.info("`result.get()` of type {}: {}".format(type(result.get()),result.get()))       
    
    total_payload['gainers'] = result.get()
    logger.info('task finished in {}'.format(time.perf_counter() - t0))
        

Here is the celery ouput:

 -------------- celery@desktop v5.1.2 (sun-harmonics)
--- ***** ----- 
-- ******* ---- Linux-5.11.0-41-generic-x86_64-with-glibc2.29 2021-12-02 18:07:16
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         default:0x7fb6f8bb5b80 (.default.Loader)
- ** ---------- .> transport:   redis://127.0.0.1:6379/0
- ** ---------- .> results:     redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
[2021-12-02 18:07:17,276: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2021-12-02 18:07:17,287: INFO/MainProcess] mingle: searching for neighbors
[2021-12-02 18:07:18,305: INFO/MainProcess] mingle: all alone
[2021-12-02 18:07:18,320: INFO/MainProcess] celery@desktop ready.
[2021-12-09 15:08:01,235: INFO/MainProcess] Task get_top_ten_gainers[26ce0da8-662c-4690-b95e-9d1e42e79e34] received
[2021-12-09 15:08:05,068: INFO/ForkPoolWorker-15] <sqlalchemy.engine.cursor.CursorResult object at 0x7f40c876e970>
[2021-12-09 15:08:05,087: INFO/ForkPoolWorker-15] `payload` of type <class 'list'>: [['MA', 'Mastercard', 3.856085884230387], ['GM', 'General Motors', 3.194377894904976], ['LH', 'LabCorp', 2.6360513469535274], ['CCI', 'Crown Castle', 2.491537650518838], ['DHI', 'D. R. Horton', 2.451821208757954], ['PEAK', 'Healthpeak Properties', 2.3698069046225845], ['BDX', 'Becton Dickinson', 2.355495473352187], ['DD', 'DuPont', 2.19100399536023], ['HLT', 'Hilton Worldwide', 1.9683928319458088], ['JKHY', 'Jack Henry & Associates', 1.7102615694164935]]
[2021-12-09 15:08:05,090: INFO/ForkPoolWorker-15] Task get_top_ten_gainers[26ce0da8-662c-4690-b95e-9d1e42e79e34] succeeded in 3.8531467270004214s: None

Here is my log file output:

[2021-12-09 15:08:01,216][index       ][INFO    ] `result` of type <class 'celery.result.AsyncResult'>: 26ce0da8-662c-4690-b95e-9d1e42e79e34
[2021-12-09 15:08:05,068][tasks       ][INFO    ] <sqlalchemy.engine.cursor.CursorResult object at 0x7f40c876e970>
[2021-12-09 15:08:05,087][tasks       ][INFO    ] `payload` of type <class 'list'>: [['MA', 'Mastercard', 3.856085884230387], ['GM', 'General Motors', 3.194377894904976], ['LH', 'LabCorp', 2.6360513469535274], ['CCI', 'Crown Castle', 2.491537650518838], ['DHI', 'D. R. Horton', 2.451821208757954], ['PEAK', 'Healthpeak Properties', 2.3698069046225845], ['BDX', 'Becton Dickinson', 2.355495473352187], ['DD', 'DuPont', 2.19100399536023], ['HLT', 'Hilton Worldwide', 1.9683928319458088], ['JKHY', 'Jack Henry & Associates', 1.7102615694164935]]
[2021-12-09 15:08:05,090][index       ][INFO    ] `result.get()` of type <class 'NoneType'>: None

Upvotes: 0

Views: 837

Answers (1)

ItayB
ItayB

Reputation: 11337

Seems like your task is taking ~4 seconds and you're not waiting for it to finish. You can wait for it like that:

def _top_ten_gainers():
    from project.tasks import get_top_ten_gainers
    result = get_top_ten_gainers.delay()
    while not result.ready():
        sleep(1)
    gains = result.get()
    logger.info("Task results: {}".format(gains))

Upvotes: 0

Related Questions