Reputation: 315
I am currently working on a flask application (Python 3.6) and wanted to integrate celery because I have multiple long running background tasks.
Edit: Celery 4.1
The integration was no problem and the celery tasks are executed correctly, but I cannot access the current state of a running task.
Celery, Flask setup:
def make_celery(app):
celery = Celery(app.import_name,
backend=app.config["result_backend"],
broker=app.config["broker_url"])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
app.config["broker_url"] = "redis://localhost:6379"
app.config["result_backend"] = "redis://localhost:6379"
app.config["DB"] = "../pyhodl.sqlite"
celery_app = make_celery(app)
Celery task:
@celery_app.task(bind=True, name="server.tasks.update_trading_pair")
def update_trading_pair(self, exchange, currency_a, currency_b):
print(exchange, currency_a, currency_b)
time.sleep(50)
Call the task and store the value in a dictionary:
task_id = update_trading_pair.delay(exchange, currency_a, currency_b)
print("NEW TASK")
print(task_id)
id = exchange_mnemonic + "_" + currency_a + "_" + currency_b
TASK_STATES[id] = task_id
Get state of a task:
result = update_trading_pair.AsyncResult(TASK_STATES[market.__id__()])
print(result.state)
print(result) # works but only prints the task_id
This is were the error is raised. When i print only the result object it simply prints the task_id. And if I try to retrieve the current state i raises following exception:
TypeError: sequence item 1: expected a bytes-like object, AsyncResult found
Upvotes: 3
Views: 3277
Reputation: 2399
EXPLANATION :
When you call your task :
task_id = update_trading_pair.delay(exchange, currency_a, currency_b)
your variable task_id
is an instance of AsyncResult
, it is not a string.
So, your variable TASK_STATES[market.__id__()]
is also an instance of AsyncResult
whereas it should be a string.
And then your are trying to instantiate an AsyncResult
object with it
result = update_trading_pair.AsyncResult(TASK_STATES[market.__id__()])
So you are instantiating an AsyncResult
object with another AsyncResult
object whereas it should be instantiated with a string.
Maybe your confusion comes from your print(task_id)
which shows you a string, but when you do this, under the hood the __str__
method of the AsyncResult
object is called and if you look at it in the source code here,
def __str__(self):
"""`str(self) -> self.id`."""
return str(self.id)
it justs print the id
attribute of your task_id
object.
SOLUTION :
You can fix it with either by doing TASK_STATES[id] = task_id.id
or by doing
result = update_trading_pair.AsyncResult(str(TASK_STATES[market.__id__()]))
Upvotes: 3