Prashant Gaur
Prashant Gaur

Reputation: 9828

twisted reactor not running properly inside celery

System/Dependencies details:

CPU --> 4
requirements --> celery==4.3.0, twisted==19.7.0 , python3.7 

Below is the celery setup I have

from threading import Thread
from celery import Celery
from twisted.internet import threads, reactor, defer
from twisted.web.error import Error
from celery import signals

app =   Celery('tasks', broker='pyamqp://guest@localhost//')

@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
    Thread(target=reactor.run, name="reactor.run", args=(False,)).start()
    print('started new thread')

@signals.worker_process_shutdown.connect()
def shutdown_reactor(**kwargs):
    """
    This is invoked when the individual workers shut down. It just stops the twisted reactor
    @param kwargs:
    @return:
    """
    reactor.callFromThread(reactor.stop)
    print('REACTOR SHUTDOWN')

def getPage(inp):
    print(inp)
    return inp

def inThread():
    print('inside inthread method')
    try:
        result = threads.blockingCallFromThread(
            reactor, getPage, "http://twistedmatrix.com/")
    except Exception as exc:
        print(exc)
    else:
        print(result)


@app.task
def add(x, y):
    print('inside add method')
    inThread()
    return x + y

Running celery worker like below:

celery -A run worker --loglevel=info

Logs when celery start:

(2_env) ubuntu@gpy:~/app/env/src$ celery -A run worker --loglevel=info

[tasks]
  . run.add

[2020-04-09 07:25:29,357: WARNING/Worker-1] started new thread
[2020-04-09 07:25:29,362: WARNING/Worker-4] started new thread
[2020-04-09 07:25:29,362: WARNING/Worker-3] started new thread
[2020-04-09 07:25:29,364: WARNING/Worker-2] started new thread
[2020-04-09 07:25:29,367: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//

calling method like below:

>>> run.add.delay(1,2)
<AsyncResult: d41680fd-7cc1-4e75-81be-6496bad0cc16>
>>> 

sometimes I can see it is working fine.

[2020-04-09 07:27:17,998: INFO/MainProcess] Received task: run.add[00934769-48c4-48b8-852c-8b746bdd5e03]
[2020-04-09 07:27:17,999: WARNING/Worker-4] inside add method
[2020-04-09 07:27:17,999: WARNING/Worker-4] inside inthread method
[2020-04-09 07:27:18,000: WARNING/Worker-4] http://twistedmatrix.com/
[2020-04-09 07:27:18,000: WARNING/Worker-4] http://twistedmatrix.com/
[2020-04-09 07:27:18,000: INFO/MainProcess] Task run.add[00934769-48c4-48b8-852c-8b746bdd5e03] succeeded in 0.00144551398989s: 3

Sometimes I can see it's not able to call getPage method and got hung like below logs

[2020-04-09 07:27:22,318: INFO/MainProcess] Received task: run.add[d41680fd-7cc1-4e75-81be-6496bad0cc16]
[2020-04-09 07:27:22,319: WARNING/Worker-2] inside add method
[2020-04-09 07:27:22,319: WARNING/Worker-2] inside inthread method

is there any issue in using reactor.run inside Thread?

UPDATE

I put print into *twisted.internet.threads.blockingCallFromThread* .

def blockingCallFromThread(reactor, f, *a, **kw):
    queue = Queue.Queue()
    def _callFromThread():
        print('inside _callFromThread')
        result = defer.maybeDeferred(f, *a, **kw)
        result.addBoth(queue.put)
    print('before calling _callFromThread')
    reactor.callFromThread(_callFromThread)
    print('after calling _callFromThread')
    result = queue.get()
    if isinstance(result, failure.Failure):
        result.raiseException()
    return result

I can see that the celery worker got hung only when _callFromThread method is not get called in reactor.callFromThread(_callFromThread) but when I manually stop the worker with CTRL + c then I can it get called.

Everytime I stop worker where the job was hung, it starts processing job.

Update:27 April 2020

It got solved if I use crochet to run the twisted reactor. I update the below function.

@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
    from crochet import setup
    setup()
    print('started new thread')

Upvotes: 2

Views: 655

Answers (1)

Jean-Paul Calderone
Jean-Paul Calderone

Reputation: 48345

With some care, which you seem to have taken, you can run the Twisted reactor in one thread. However, you will not be able to run it in more than one thread which I suppose is what is happening when you use it with Celery. It has both instance and global state which will get stomped on if it is run in more than one thread.

Instead, try using crochet to coordinate calls onto the reactor running in a single non-main thread from as many other threads as you like.

Upvotes: 1

Related Questions