Reputation: 23
I'm writing a function dealing with two WebSockets and the response of each WebSocket will change a shared DataFrame df.
import json
import asyncio
import websockets
@asyncio.coroutine
def printResponse(df, dataSocket, quoteSocket, dataRequest, quoteRequest):
yield from dataSocket.send(dataRequest)
yield from quoteSocket.send(quoteRequest)
response = yield from dataSocket.recv() # skip first response
response = yield from quoteSocket.recv() # skip first response
while True:
response = yield from dataSocket.recv()
print("<< {}".format(json.loads(response)))
df = changeRecord(df, response)
response = yield from quoteSocket.recv()
print("<< {}".format(json.loads(response)))
df = changeRecord(df, response)
I'm not sure but current code seems to process two WebSockets in turns. I want to process response in a "first in first out" manner, regardless which WebSocket it comes from. How should I make changes to achieve this goal?
Upvotes: 1
Views: 427
Reputation: 4954
Because you are using the two yield from
statements inside the same while loop it will process them in order and then repeat ad infinitum.
So it will always wait until it gets a response from dataSocket
and then it will wait until it gets a response from quoteSocket
, and then rinse and repeat.
Tasks()
work well for what you are trying to do because they allow the coroutines to operate independently of each other. So if you start two separate coroutines in their own Task wrappers, each will then wait for their own next response without necessarily disturbing the other.
For example:
import json
import asyncio
import websockets
@asyncio.coroutine
def coroutine_1(df, dataSocket):
yield from dataSocket.send(dataRequest)
response = yield from dataSocket.recv() # skip first response
while True:
response = yield from dataSocket.recv()
print("<< {}".format(json.loads(response)))
df = changeRecord(df, response)
@asyncio.coroutine
def coroutine_2(df, quoteSocket):
yield from quoteSocket.send(quoteRequest)
response = yield from quoteSocket.recv() # skip first response
while True:
response = yield from quoteSocket.recv()
print("<< {}".format(json.loads(response)))
df = changeRecord(df, response)
@asyncio.coroutine
def printResponse(df, dataSocket, quoteSocket):
websocket_task_1 = asyncio.ensure_future(coroutine_1(df, dataSocket))
websocket_task_2 = asyncio.ensure_future(coroutine_2(df, quoteSocket))
yield from asyncio.wait([websocket_task_1, websocket_task_2])
Upvotes: 1