Reputation: 9828
System/Dependencies details:
CPU --> 4
requirements --> celery==4.3.0, twisted==19.7.0 , python3.7
Below is the celery setup I have
from threading import Thread
from celery import Celery
from twisted.internet import threads, reactor, defer
from twisted.web.error import Error
from celery import signals
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
Thread(target=reactor.run, name="reactor.run", args=(False,)).start()
print('started new thread')
@signals.worker_process_shutdown.connect()
def shutdown_reactor(**kwargs):
"""
This is invoked when the individual workers shut down. It just stops the twisted reactor
@param kwargs:
@return:
"""
reactor.callFromThread(reactor.stop)
print('REACTOR SHUTDOWN')
def getPage(inp):
print(inp)
return inp
def inThread():
print('inside inthread method')
try:
result = threads.blockingCallFromThread(
reactor, getPage, "http://twistedmatrix.com/")
except Exception as exc:
print(exc)
else:
print(result)
@app.task
def add(x, y):
print('inside add method')
inThread()
return x + y
Running celery worker like below:
celery -A run worker --loglevel=info
Logs when celery start:
(2_env) ubuntu@gpy:~/app/env/src$ celery -A run worker --loglevel=info
[tasks]
. run.add
[2020-04-09 07:25:29,357: WARNING/Worker-1] started new thread
[2020-04-09 07:25:29,362: WARNING/Worker-4] started new thread
[2020-04-09 07:25:29,362: WARNING/Worker-3] started new thread
[2020-04-09 07:25:29,364: WARNING/Worker-2] started new thread
[2020-04-09 07:25:29,367: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
calling method like below:
>>> run.add.delay(1,2)
<AsyncResult: d41680fd-7cc1-4e75-81be-6496bad0cc16>
>>>
sometimes I can see it is working fine.
[2020-04-09 07:27:17,998: INFO/MainProcess] Received task: run.add[00934769-48c4-48b8-852c-8b746bdd5e03]
[2020-04-09 07:27:17,999: WARNING/Worker-4] inside add method
[2020-04-09 07:27:17,999: WARNING/Worker-4] inside inthread method
[2020-04-09 07:27:18,000: WARNING/Worker-4] http://twistedmatrix.com/
[2020-04-09 07:27:18,000: WARNING/Worker-4] http://twistedmatrix.com/
[2020-04-09 07:27:18,000: INFO/MainProcess] Task run.add[00934769-48c4-48b8-852c-8b746bdd5e03] succeeded in 0.00144551398989s: 3
Sometimes I can see it's not able to call getPage
method and got hung like below logs
[2020-04-09 07:27:22,318: INFO/MainProcess] Received task: run.add[d41680fd-7cc1-4e75-81be-6496bad0cc16]
[2020-04-09 07:27:22,319: WARNING/Worker-2] inside add method
[2020-04-09 07:27:22,319: WARNING/Worker-2] inside inthread method
is there any issue in using reactor.run
inside Thread
?
UPDATE
I put print into *twisted.internet.threads.blockingCallFromThread*
.
def blockingCallFromThread(reactor, f, *a, **kw):
queue = Queue.Queue()
def _callFromThread():
print('inside _callFromThread')
result = defer.maybeDeferred(f, *a, **kw)
result.addBoth(queue.put)
print('before calling _callFromThread')
reactor.callFromThread(_callFromThread)
print('after calling _callFromThread')
result = queue.get()
if isinstance(result, failure.Failure):
result.raiseException()
return result
I can see that the celery worker got hung only when _callFromThread
method is not get called in reactor.callFromThread(_callFromThread)
but when I manually stop the worker with CTRL + c
then I can it get called.
Everytime I stop worker where the job was hung, it starts processing job.
Update:27 April 2020
It got solved if I use crochet to run the twisted reactor. I update the below function.
@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
from crochet import setup
setup()
print('started new thread')
Upvotes: 2
Views: 655
Reputation: 48345
With some care, which you seem to have taken, you can run the Twisted reactor in one thread. However, you will not be able to run it in more than one thread which I suppose is what is happening when you use it with Celery. It has both instance and global state which will get stomped on if it is run in more than one thread.
Instead, try using crochet to coordinate calls onto the reactor running in a single non-main thread from as many other threads as you like.
Upvotes: 1