kinreyli
kinreyli

Reputation: 23

How to process multiple WebSockets in a FIFO way in Python?

I'm writing a function dealing with two WebSockets and the response of each WebSocket will change a shared DataFrame df.

import json
import asyncio
import websockets

@asyncio.coroutine
def printResponse(df, dataSocket, quoteSocket, dataRequest, quoteRequest):

    yield from dataSocket.send(dataRequest)
    yield from quoteSocket.send(quoteRequest)

    response = yield from dataSocket.recv()     # skip first response
    response = yield from quoteSocket.recv()    # skip first response

    while True:

        response = yield from dataSocket.recv()
        print("<< {}".format(json.loads(response)))
        df = changeRecord(df, response)

        response = yield from quoteSocket.recv()
        print("<< {}".format(json.loads(response)))
        df = changeRecord(df, response)

I'm not sure but current code seems to process two WebSockets in turns. I want to process response in a "first in first out" manner, regardless which WebSocket it comes from. How should I make changes to achieve this goal?

Upvotes: 1

Views: 427

Answers (1)

songololo
songololo

Reputation: 4954

Because you are using the two yield from statements inside the same while loop it will process them in order and then repeat ad infinitum.

So it will always wait until it gets a response from dataSocket and then it will wait until it gets a response from quoteSocket, and then rinse and repeat.

Tasks() work well for what you are trying to do because they allow the coroutines to operate independently of each other. So if you start two separate coroutines in their own Task wrappers, each will then wait for their own next response without necessarily disturbing the other.

For example:

import json
import asyncio
import websockets

@asyncio.coroutine
def coroutine_1(df, dataSocket):
    yield from dataSocket.send(dataRequest)
    response = yield from dataSocket.recv()     # skip first response
    while True:
        response = yield from dataSocket.recv()
        print("<< {}".format(json.loads(response)))
        df = changeRecord(df, response)

@asyncio.coroutine
def coroutine_2(df, quoteSocket):
    yield from quoteSocket.send(quoteRequest)
    response = yield from quoteSocket.recv()    # skip first response
    while True:
        response = yield from quoteSocket.recv()
        print("<< {}".format(json.loads(response)))
        df = changeRecord(df, response)

@asyncio.coroutine
def printResponse(df, dataSocket, quoteSocket):

    websocket_task_1 = asyncio.ensure_future(coroutine_1(df, dataSocket))
    websocket_task_2 = asyncio.ensure_future(coroutine_2(df, quoteSocket))

    yield from asyncio.wait([websocket_task_1, websocket_task_2])

Upvotes: 1

Related Questions