Ariel
Ariel

Reputation: 3611

How can I use uWSGI Spooler in Django?

I'm trying to run concurrent tasks using uWSGI Spooler instead of Celery in Django 3.2.3, Python 3.7.9. I have found a few resources like this, this, and this, but nothing works. I have encountered many errors in this journey, and I have fixed them using solutions I found online, and right now this is what I have:

Setup

uwsgi.ini

[uwsgi]

pythonpath          = /path/to/djproj
wsgi-file           = /path/to/djproj/wsgi.py
uid                 = myuid
module              = wsgi:application

master              = true
processes           = 1
threads             = 10
lazy-apps           = true
http                = 0.0.0.0:8080

vacuum              = true
log-format          = %(ltime) Worker: %(wid) %(status) %(method) %(uri) Size: %(size)
log-date            = %%Y %%m %%d %%H:%%M:%%S.000
# Let django handle most of the logging
disable-logging     = true
log-5xx             = true

harakiri            = 60
harakiri-verbose    = true

stats               = /tmp/djproj_stats.socket

# Spooling
spooler             = /path/to/tasks
spooler-harakiri    = 600
import              = djproj.tasks

tasks.py

import logging

logger = logging.getLogger(__name__)

try:
    from uwsgidecorators import spool

    logger.warning("Imported spool successfully.")
except Exception:
    logger.warning("Couldn't import spool.")

    def spool(func):
        def func_wrapper(**arguments):
            return func(arguments)

        return func_wrapper


@spool
def run_task(arguments):
    logger.warning("Running in spool.")
    from djproj.myapp.models import MyModel

    obj = MyModel.objects.get(id=arguments["obj_id"])
    obj.run()

djproj/myapp/models.py

# ...

def prepare_spooler_args(**kwargs):
    args = {}
    for name, value in kwargs.items():
        args[name.encode("utf-8")] = str(value).encode("utf-8")
    return args

class MyModel(models.Model):
    # ...
    def start_run_in_spooler(self):
        args = prepare_spooler_args(task_id=self.id)
        run_task(args)

Results

When I run uwsgi --ini uwsgi.ini, and access the endpoint that triggers this code, I get:

...
2021 09 20 12:56:20.000 - *** Stats server enabled on /tmp/djproj_stats.socket fd: 16 ***
2021 09 20 12:56:20.000 - spawned uWSGI http 1 (pid: 919)
2021 09 20 12:56:20.000 - [spooler /path/to/tasks pid: 917] managing request uwsgi_spoolfile_on_5a22c167ad32_826_2_189168444_1632124468_886229 ...
2021 09 20 12:56:20.000 - unable to find the spooler function, have you loaded it into the spooler process ?

I find it very strange that there are so few resources online on how to make this work. Every time I look for solutions everybody recommends Celery like it's a silver bullet for concurrency in any Python app, even though Django + uWSGI is a very common combination, and the Spooler seems like it would be a simple and lightweight solution. If anybody has any tips on how to get this working it would be awesome.

Upvotes: 4

Views: 1132

Answers (1)

Ariel
Ariel

Reputation: 3611

With the help of a colleague we finally managed to make it work. Here are the changes that were necessary:

uwsgi.ini

import = djproj.tasks was changed to spooler-import = djproj.tasks.

tasks.py

Imports had to be moved out of the spooled function, and Django needed to be initialized for the imports to work. Here's the final version of the code:

import logging
from functools import wraps

import django

logger = logging.getLogger(__name__)

try:
    from djproj.myapp.models import MyModel
except Exception:
    logger.warning(f"Django is not loaded yet! Setting up...")
    try:
        django.setup(set_prefix=False)
    except Exception:
        pass
    from djproj.myapp.models import MyModel

try:
    from uwsgidecorators import spool
except Exception:
    logger.warning("Couldn't import uwsgidecorators!")

    def spool(pass_arguments):
        def decorator(method):
            if callable(pass_arguments):
                method.gw_method = method.__name__
            else:
                method.gw_method = pass_arguments

            @wraps(method)
            def wrapper(*args, **kwargs):
                method(*args, **kwargs)

            return wrapper

        if callable(pass_arguments):
            return decorator(pass_arguments)
        return decorator


@spool(pass_arguments=True)
def run_task(task_id):
    task = MyModel.objects.get(id=task_id)
    task.run()

models.py and views.py

The model method that called the spooled function was removed. Instead we just call the spooled function directly in the view:

class MyModelViewSet(viewsets.ModelViewSet):
    queryset = models.MyModel.objects.all()
    serializer_class = serializers.MyModelSerializer

    def create(self, request, *args, **kwargs):
        # ...

        run_task(task_id=task.id)

        # ...
        return Response(serializer.data, status=status_code, headers=headers)

Hope this is useful for anyone facing similar issues.

Upvotes: 4

Related Questions