Reputation: 1159
OK, I know that's a mouthful. I'm at a loss how to even approach this. I want to run threads that process data in parallel to Flask, but I haven't seen many people doing so. This is for a personal web app, so I don't want to go down the path of Celery and RabbitMQ. I already created a module that connects to a stock broker's API and streams stock data. For simplicity's sake, lets just say it generates a random number at a rate of 1 number per second. At each tick of new data created by the Number Generator thread, I want a few other threads to process that same number. Lets call them Math threads. Once they've completed processing the most recent number, I want the results to be combined (into a JSON) and sent over Websocket. I have independently been able to send data over a Websocket using Flask-Sockets. Here's an illustration of what I want to accomplish, where each box can be thought of as a thread.
-------------
| Number |
| Generator | (rate of 1 number / second)
-------------
| (same number sent to all 3 math threads)
-----+-----
/ | \
v v v
--------- --------- ---------
| math1 | | math2 | | math3 |
--------- --------- ---------
| | | (results combined and sent over Websocket)
v v v
---------------------------------------
| Flask | WebSocket Handler |
---------------------------------------
And here's the simple websocket code.
app = Flask(__name__)
sockets = Sockets(app)
@sockets.route('/stream')
def stream_socket(ws):
while True: # for now just sending endless stream of time + random number every second
message = {"time": int(time.time()*1000), "data": randrange(100)}
ws.send(json.dumps(message))
time.sleep(1)
@app.route('/')
def test():
return render_template('main.html')
if __name__ == '__main__':
app.run(host='0.0.0.0',debug=True,threaded=True)
So I have the "number generator" module (actually the stock quote streamer) and the websocket connection working independently. I just need to connect it all together with the threading, which is where I'm struggling. If that number generator were a simple random number generator, and the math threads were also simple (e.g. 2*x, sinc(x), etc.), then I'm curious if someone could get me going in the right direction with the threading done in parallel to Flask. Perhaps some skeleton code. Thank you.
UPDATE: I can get separate threads to run in parallel to Flask, like something simple below. This works when I run 'python test.py', but to get Websockets to work I use gunicorn like such 'gunicorn -k flask_sockets.worker test:app'. This however seems to be preventing the multithreading from working.
UPDATE2: I was able to get Websockets and multithreading to work using gevent rather than gunicorn. Updated code below.
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
class myThread(threading.Thread):
def __init__(self, wait, msg):
super(myThread, self).__init__()
self.wait = wait
self.msg = msg
def run(self):
for i in range(5):
time.sleep(self.wait)
print self.msg
app = Flask(__name__)
sockets = Sockets(app)
@sockets.route('/stream')
def stream_socket(ws):
while True: # for now just sending endless stream of time + random number every second
message = {"time": int(time.time()*1000), "data": randrange(100)}
ws.send(json.dumps(message))
time.sleep(1)
@app.route('/')
def test():
return render_template('main.html')
if __name__ == '__main__':
thread1 = myThread(2, "thread1")
thread2 = myThread(3, "thread2")
thread1.start()
thread2.start()
server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
server.serve_forever()
#app.run(host='0.0.0.0',debug=True,threaded=True)
Now that I've been able to achieve running parallel threads to Flask while it independently sends data over a Websocket, I suppose my biggest question now is how to share the results of the parallel worker threads with the Websocket decorator so it can send out the data that was processed by those separate threads. Is there a way to place that @sockets.route decorator within a thread?
Upvotes: 3
Views: 3327
Reputation: 1159
OK, something like this seems to work. Separate thread generates numbers at a rate of one per second and places them in a queue. The Websocket handler gets data from the queue when it's available, and it sends it out on the Websocket. Obviously this is just a simple example, but it gets me going in the right direction. Still would be curious if anyone has any suggestions.
app = Flask(__name__)
sockets = Sockets(app)
myQueue = Queue.Queue(10)
class myThread(threading.Thread):
def __init__(self, length):
super(myThread, self).__init__()
self.length = length
def run(self):
for i in range(self.length):
time.sleep(1)
myQueue.put({"time": int(time.time()*1000), "data": randrange(100)})
@sockets.route('/stream')
def stream_socket(ws):
while True:
message = myQueue.get()
ws.send(json.dumps(message))
@app.route('/')
def test():
return render_template('main.djhtml')
if __name__ == '__main__':
thread1 = myThread(30)
thread1.start()
server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
server.serve_forever()
Upvotes: 3