Parapara
Parapara

Reputation: 13

How to filter specific messages received from a websocket api in python as they stream in and write them to CSV

I am VERY new to coding and Python and I am trying to just receive live trade data from the Bitfinex API, and filter out specific messages as they come in because it gives duplicates. I want to take these filtered messages and then output them to a csv file continuously.

Specifically, I want to save the messages titled "te" (see output from API below) because these are the trades that are executed as they are executed. The stream gives "tu" as well, which are duplicates. I want to just take the "te" and download them live into a csv for other processing and saving.

Here is my code, which is a stripped down version of one that I found online:

import websocket
import time
import sys
from datetime import datetime, timedelta, timezone

import sched, time
import json
import csv
import requests

class BitfinexWebSocketReader():
    endpoint = "wss://api.bitfinex.com/ws/2"
    def __init__(self):
        #websocket.enableTrace(True)
        self.ws = websocket.WebSocketApp(
                BitfinexWebSocketReader.endpoint,
                on_message = self.on_message,                 
                on_error = self.on_error, 
                on_close = self.on_close
         )
        self.ws.on_open = self.on_open

        try:
            self.run()
        except KeyboardInterrupt:
            self.ws.close()

    def run(self):
        self.ws.run_forever()
        print("### run ###")
        pass

    def on_message(self, ws, message):
        print(message)

    def on_error(self, ws, error):
        print(error)
        sys.exit()

    def on_close(self, ws):
        print("### closed ###")

    def on_open(self, ws):
        #print("### open ###")
        ws.send(json.dumps({"event": "subscribe", "channel": "Trades", "symbol": "tBTCUSD"}))

if __name__=="__main__":
    BitfinexWebSocketReader()

And here is an example of a couple seconds of the output:

{"event":"info","version":2,"serverId":"88c6df7e-5159-4a8e-b1c4-f08904aeeb0a","platform":{"status":1}}
{"event":"subscribed","channel":"trades","chanId":23,"symbol":"tBTCUSD","pair":"BTCUSD"}
[23,[[281534165,1534448458635,0.005,6401.5],[281534164,1534448457975,0.01999998,6401.5],[281534139,1534448438766,-0.31749096,6401.4],[281534132,1534448438051,0.005,6401.5],[281534116,1534448432624,-0.051,6401.4],[281534099,1534448425380,0.18699482,6401.5],[281534097,1534448424900,0.013558,6401.5],[281534096,1534448424718,0.0514726,6401.5],[281534083,1534448415788,0.005,6401.8],[281534080,1534448415568,-1,6400.8],[281534079,1534448415566,-1,6401.8],[281534073,1534448409395,-0.0325,6403],[281534053,1534448398108,-0.2498,6405.1],[281534048,1534448396370,-0.25,6404.9],[281534043,1534448394675,0.42406762,6400],[281534029,1534448390257,0.30000001,6400],[281534028,1534448390236,0.30000001,6400],[281534027,1534448389714,1,6400],[281534025,1534448389033,1.18922278,6400],[281534024,1534448389030,0.41523564,6399.7],[281534023,1534448389028,0.39554158,6399.7],[281534013,1534448384920,0.025,6399.7],[281534011,1534448382885,0.018794,6399.7],[281534008,1534448380817,-1.49155951,6399.6],[281534007,1534448380815,-2.5,6399.6],[281534006,1534448380813,-0.34,6399.6],[281534005,1534448380811,-0.15098794,6399.6],[281534004,1534448380808,-0.29899445,6399.6],[281534000,1534448379152,-0.005,6399.6],[281533999,1534448377821,-0.16825162,6399.6]]]
[23,"hb"]
[23,"te",[281534199,1534448478028,-0.00937287,6401.4]]
[23,"te",[281534200,1534448478031,-0.29062714,6401.4]]
[23,"te",[281534201,1534448478036,-0.30000001,6401.4]]
[23,"tu",[281534201,1534448478036,-0.30000001,6401.4]]
[23,"tu",[281534199,1534448478028,-0.00937287,6401.4]]
[23,"tu",[281534200,1534448478031,-0.29062714,6401.4]]
[23,"te",[281534204,1534448478180,-0.65915285,6401.4]]
[23,"tu",[281534204,1534448478180,-0.65915285,6401.4]]
[23,"hb"]
[23,"te",[281534224,1534448479402,-0.114,6399.9]]
[23,"tu",[281534224,1534448479402,-0.114,6399.9]]
[23,"te",[281534232,1534448480466,-0.00012512,6399.9]]
[23,"tu",[281534232,1534448480466,-0.00012512,6399.9]]

Bonus question: why does that super long first entry pop up every time I execute the code?

Upvotes: 1

Views: 1781

Answers (1)

prithajnath
prithajnath

Reputation: 2115

You can initialize some kind of data structure in the constructor, like a list() or a set() to store the desired messages and then filter them in the on_message method.

So in your constructor

def __init__(self):
    #websocket.enableTrace(True)
    self.ws = websocket.WebSocketApp(
            BitfinexWebSocketReader.endpoint,
            on_message = self.on_message,                 
            on_error = self.on_error, 
            on_close = self.on_close
     )
    self.ws.on_open = self.on_open
    self.store = []

    try:
        self.run()
    except KeyboardInterrupt:
        self.ws.close()

And in your on_message method

def on_message(self, ws, message):
    if "te" in message:
        self.store.append(message)
    print(message)

Upvotes: 1

Related Questions