Adil Shirinov
Adil Shirinov

Reputation: 135

Celery custom task state is not recognized

I have a Celery task that creates a CSV file and during the process of creation I want to set a custom state to the task so I can display it on the frontend.

Here is my task from tasks.py

@app.task(bind=True)
def make_csv_file(self, schema_id, rows):
    schema = SchemaModel.objects.get(id=schema_id)
    salt = str(uuid.uuid4())
    print(f"salt is {salt}")
    try:
        columns = [
            column for column in schema.column_in_schemas.all().order_by("order")
        ]
        path = os.path.join(settings.MEDIA_ROOT, f"schema-{schema.id}---{salt}.csv")
        f = open(path, "w")
        f.truncate()
        csv_writer = csv.writer(f)
        csv_writer.writerow([column.name for column in columns]) # writerow?
        for x in range(rows):
            row_to_write = list()
            for column in columns:
                row_to_write.append(" ".join(generate_csv_data(column.id).split()))
            csv_writer.writerow(
                row_to_write
            )
            # the line bellow is supposed to set a custom state to the task
            # https://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-states
            if not self.request.called_directly:
                self.update_state(state='PROGRESS', meta={'done': x, 'total': rows})
        schema.files.create(file=path, status=SchemaModelStatusChoices.READY)
        print("created")
    except Exception as exc:
        print(exc)
        schema.files.create(file='', status=SchemaModelStatusChoices.FAILED)
    finally:
        schema.save()

Here comes my views.py

def view_schema(request, pk):
    schema = SchemaModel.objects.get(pk=pk)
    form = RowsForm(request.POST or None)
    context = {
        'schema': schema,
        'form': form,
    }

    if request.method == 'POST':
        rows = int(request.POST.get('rows'))
        job = make_csv_file.apply_async((schema.id, rows))
        request.session['task_id'] = job.id # here I write the task id so that I can get it's state from the view below
        return redirect('schemas:view_schema', pk=pk)
    return render(request, 'schemas/view_schema.html', context)

@csrf_exempt
def poll_state(request):
    data = 'Fail'
    if request.is_ajax():
        if 'task_id' in request.POST.keys() and request.POST['task_id']:
            task_id = request.POST['task_id']
            task = AsyncResult(task_id)
            #data = task.result or task.state
            data = task.state # The state here is always SUCCESS
        else:
            data = 'No task_id in the request'
    else:
        data = 'This is not an ajax request'
    json_data = json.dumps(data)
    return HttpResponse(json_data)

Here my CELERY-related settings from settings.py

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
CELERYD_STATE_DB = os.path.join(BASE_DIR, "celery-temp")
CELERY_IGNORE_RESULT = False
CELERY_TRACK_STARTED = True

A File gets created, the task executes but the custom state thing does not work. I've been stuck with this problem for a day already but don't have a clue how to solve it. In case you need more code pieces, place a comment under the post.

UPD: I found one issue with the code, I was writing the task id to the session but never deleted the old one. Now the problem is the AsyncResult(task_id).state in poll_state function returns only SUCCESS status

Upvotes: 0

Views: 674

Answers (1)

uroboros
uroboros

Reputation: 174

Did you come right?

AsyncResult(task_id)._get_task_meta() worked for me

Upvotes: 1

Related Questions