Reputation: 131
I'm trying to implement a redis queue in the current system. The job
will be sent to another module and it should wait until the job is done and returned the result, job.result
, then move on:
with Connection(redis_connection):
job = job_queue.enqueue(worker_func, func_input1, func_input2)
print("waiting for result")
print(datetime.datetime.now())
while job.result is None:
pass
print(datetime.datetime.now())
print("got result")
# next step
next_step_func(job.result)
...
I'm facing 2 issues here:
while job.result is None
is taking a very long time. My processing in the worker_func
is about 2-3 seconds, which involves calling APIs on another server, yet the busy wait while job.result is None
itself takes another >= 3 seconds, adding to >= 5 seconds in total. I am positive that the waiting takes place after the execution of while job.result is None
because I added logs for both worker_func
and while job.result is None
: 2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT start work
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:57.601189
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.075137
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT end work
...
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT waiting for result
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:53.704891
2021-07-12T18:57:59.09+0800 [APP/PROC/WEB/0] OUT 2021-07-12 10:57:59.096009
as you can see above, the busy wait while loop happens after the worker_func
is done.
2, is there any other elegant way to implement this synchronous wait here instead of a busy loop? I think the busy loop here is definitely not the best implementation as it will consume a lot of CPU resources.
Thanks!
-- editing my code above to give a clearer context
I need to value of next_step_func(job.result)
to be returned from where job_queue.enqueue
is called. so a clearer structure would be:
def endpoint():
with Connection(redis_connection):
job = job_queue.enqueue(worker_func, func_input1, func_input2)
print("waiting for result")
print(datetime.datetime.now())
while job.result is None:
pass
print(datetime.datetime.now())
print("got result")
# next step
return next_step_func(job.result)
...
so the painpoint is that I need the job.result
able to be returned in endpoint()
, yet the Job Callback will take my job to a different context at on_success
.
Upvotes: 0
Views: 2959
Reputation: 338128
Documentation suggests using the job callbacks as an option:
def job_succeeded(job, connection, result, *args, **kwargs):
next_step_func(job.result)
def job_failed(job, connection, type, value, traceback):
# react to the error
pass
with Connection(redis_connection):
args = (func_input1, func_input2)
job_queue.enqueue(worker_func, args=args, on_success=job_succeeded, on_failure=job_failed)
Upvotes: 2