Jordan Parker
Jordan Parker

Reputation: 41

Fastest way to pull data from a WebSocket

The problem I face is that I'm trying to pull a high frequency of data in small packets from a single WebSocket to then push it through to my AWS Kinesis Steam for processing. I am using Python v3.6.

At the moment I am using pythons synchronous web socket library websocket-client and I am having no problem pulling and pushing to my Stream.

Would I be better of to use the WebSockets library for asynchronous sockets? I'm concerned that the blocking in the loop structure of WebSocket request may be a bottleneck.

while True:
    session = boto3.Session(aws_key, aws_secretkey)
    kinesis = session.client('kinesis', region_name='us-east-1')
    conn = ws.create_connection(url, sslopt={"cert_reqs": ssl.CERT_NONE})
    count = 0
    data = []
    try:
        while True:
            msg = json.dumps(json.loads(conn.recv())['data'])
            data.append({'Data':msg, 'PartitionKey':'trade'})
            count += 1
            if count == 100:
                kinesis.put_records(StreamName = 'Binance_Stream', Records = data)
                count = 0
                data = []
                print('100 msg posted')
    except (ws.WebSocketConnectionClosedException, SQLAlchemyError) as e:
        print('Connection Error: ' + e)

Upvotes: 2

Views: 2467

Answers (2)

eatmeimadanish
eatmeimadanish

Reputation: 3907

In addition to the answer above referencing gevent:

The only blocking will happen at the receive, you can overcome this with:

wsock = request.environ.get('wsgi.websocket')
with Timeout(2, False) as timeout:
    message = wsock.receive()

The 2 is the time in seconds you want it to timeout.

Upvotes: 0

rbrook
rbrook

Reputation: 81

Easiest way to run websocket client in a parallel manner is to utilize Python library Gevent.

This will enable you to run your client in parallel without touching your existing code (too much). You'd only need to manage the Greenlets (sort of threads but safer, easier to handle and highly recommended for IO operations).

Upvotes: 2

Related Questions