Korne127
Korne127

Reputation: 332

How to properly use multiprocessing module with Django?

I'm having a python 3.8+ program using Django and Postgresql which requires multiple threads or processes. I cannot use threads since the GLI will restrict them to a single process which results in an awful performance (especially since most of the threads are CPU bound).

So the obvious solution was to use the multiprocessing module. But I've encountered several problems:

  1. When using spawn to generate new processes, I get the "Apps aren't loaded yet" error when the new process imports the Django models. This is because the new process doesn't have the database connection given to the main process by python manage.py runserver. I circumvented it by using fork instead of spawn (like advised here) so the connections are copied to the other processes but I feel like this is not the best solution and there should be a clean way to start new processes with the necessary connections.

  2. When several of the processes simultaneously access the database, sometimes false results are given back (partly even from wrong models / relations) which crashes the program. This can happen in the initial startup when fetching data but also when the program is running. I tried to use ISOLATION LEVEL SERIALIZABLE (like advised here) by adding it in the options in the database settings but that didn't work.
    A possible solution might be using custom locks that are given to every process but that doesn't feel like a good solution as well.

So in general, the question is: Is there a good and clean way to use multiprocessing in Django without these issues? A way that new processes have the database connections without needing to rely on fork and that all processes can just access the database without having any race conditions sometimes producing false results like this?

One important thing: I don't use a Pool since the processes aren't running the same simple task. The processes are each running different specific tasks, share data via multiprocessing Signals, Queues, Values and Namespaces (shared memory) and new processes can be triggered by user interaction (websockets).
I've tried to look into Celery since this has been recommended on a lot of questions about Django and multiprocessing but I wouldn't know how to use something like that in the project structure with the specific different processes that need to be created at specific points and the data that gets transferred over the Queues, Signals, Values and Namespaces in the existing project.

Thank you for reading; any help is appreciated!

Upvotes: 3

Views: 1375

Answers (2)

Jimmy Engelbrecht
Jimmy Engelbrecht

Reputation: 723

However, it still throws errors like django.db.utils.OperationalError: lost synchronization with server: got message type "1", length 976434746

We need to close the old connection to db, which Django had automatically created before. We can do this by executing the function before we start a new process (or thread):

django.db.close_old_connections()

Django automatically creates new connections after closing previous connections.

Upvotes: 0

AKX
AKX

Reputation: 169051

With every new process, a setup function calling Django.setup() is first called before executing the real function. My hope was that with this way, every process would create an independent connection to the database so that the current system could work.

Yes - you can do that with initializer, as explained in my other answer from yesteryear.

However, it still throws errors like django.db.utils.OperationalError: lost synchronization with server: got message type "1", length 976434746

That means you're using the fork start method for subprocesses, and any database connections and their state has been forked into the subprocesses too, and they will be out of sync when used by multiple processes.

You'll need to close them:

def subprocess_setup():
    django.setup()
    from django.db import connections
    for conn in connections.all():
        conn.close()
    
with ProcessPoolExecutor(max_workers=5, initializer=subprocess_setup) as executor:
   

Upvotes: 3

Related Questions