Daniel Qiao
Daniel Qiao

Reputation: 131

Elegantly wait until a job in redis queue is done, without busy wait?

I'm trying to implement a redis queue in the current system. The job will be sent to another module and it should wait until the job is done and returned the result, job.result, then move on:

with Connection(redis_connection):
    job = job_queue.enqueue(worker_func, func_input1, func_input2)

print("waiting for result")
print(datetime.datetime.now())
while job.result is None:
    pass
print(datetime.datetime.now())
print("got result")

# next step
next_step_func(job.result)

...

I'm facing 2 issues here:

  1. the busy wait, while job.result is None is taking a very long time. My processing in the worker_func is about 2-3 seconds, which involves calling APIs on another server, yet the busy wait while job.result is None itself takes another >= 3 seconds, adding to >= 5 seconds in total. I am positive that the waiting takes place after the execution of while job.result is None because I added logs for both worker_func and while job.result is None :
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT start work
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:57.601189
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.075137
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT end work
...
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT waiting for result
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:53.704891
   2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.096009

as you can see above, the busy wait while loop happens after the worker_func is done.

2, is there any other elegant way to implement this synchronous wait here instead of a busy loop? I think the busy loop here is definitely not the best implementation as it will consume a lot of CPU resources.

Thanks!

-- editing my code above to give a clearer context

I need to value of next_step_func(job.result) to be returned from where job_queue.enqueue is called. so a clearer structure would be:

def endpoint():
    with Connection(redis_connection):
        job = job_queue.enqueue(worker_func, func_input1, func_input2)

    print("waiting for result")
    print(datetime.datetime.now())
    while job.result is None:
        pass
    print(datetime.datetime.now())
    print("got result")

    # next step
    return next_step_func(job.result)

...

so the painpoint is that I need the job.result able to be returned in endpoint(), yet the Job Callback will take my job to a different context at on_success.

Upvotes: 0

Views: 2959

Answers (1)

Tomalak
Tomalak

Reputation: 338128

Documentation suggests using the job callbacks as an option:

def job_succeeded(job, connection, result, *args, **kwargs):
    next_step_func(job.result)

def job_failed(job, connection, type, value, traceback):
    # react to the error
    pass

with Connection(redis_connection):
    args = (func_input1, func_input2)
    job_queue.enqueue(worker_func, args=args, on_success=job_succeeded, on_failure=job_failed)

Upvotes: 2

Related Questions