Jürgen Zornig
Jürgen Zornig

Reputation: 1244

Replay timer based data though python websocket

Currently I am working on a IoT Mockup Client, which reads in a couple of real-world timeseries data CSVs and "replays" them based on their timestamp and the system clock (threading module).

So basically I have a Pandas Dataframe which holds all the old data and a timer tick handler function which extracts the corresponding rows from it to yield them to any data sink.

This works perfectly if my timer-tick-handler uses requests.post(.. then to post simply a text body gathered from df[...<current-timeslice-filer>...].to_csv().

Now I want to stream this data to a server api, therefore we decided to transmit data via Websockets rather than HTTP-REST. There things start to get tricky. The websockets module relies heavily on asyncio which needs its own event loop. As my timer is already kind of an event loop (based on threading.timer) and I must admit that I don't fully understand the concepts around asyncio, I think that this doesn't quite fit together.

At least I do not know how to integrate the websocket.send() method into my handler method so that it is run inside the asyncio event loop.

DataFrame.to_csv(... can be given a file handler file_or_buf, and I would appreciate it more to use a websocket just like a file handler and provide it here, to flush my data through.

EDIT what I have so far...

Thats my timer class which calls method do() every interval seconds

from threading import Thread,Event

class TimeTicker(Thread):
    def __init__(self, interval=1):
        Thread.__init__(self)
        self.interval = interval
        self.stopped = Event()

    def run(self):
        while not self.stopped.wait(self.interval):
            self.do()

    def do(self):
        print('tick')

    def get_stopflag(self):
        return self.stopped

Now the basic snippet for using websockets and asyncio is...

import asyncio
import websockets

async def hello():
    uri = "ws://echo.websocket.org"
    async with websockets.connect(uri) as websocket:
        await websocket.send(thread.stream())
        r = await websocket.recv()
        print(r)

asyncio.get_event_loop().run_until_complete(hello())

I already tried to make my do() method async but I am not able to initialize my TimeTicker class inside the asyncio event loop, so that method calls are 'awaited'

To keep things clear, I would like to initialize the websocket connection outside the TimeTicker object (it should only provide timeseries data every second and pass it to websocket.send(). Nevertheless I am not sure where this pass of data should happen then. There might also be a better solution of my TimeTicker class to yield data every second instead of just calling a method. Anyway I´d like to get advice on this.

Hint: TimeTicker is only a superclass on my datasource class which actually holds the pandas dataframe with approx. 200.000 rows of timeseries data read from CSV as a reservoir to send.

Solution: based on @wowkin2 's answer my TimeTicker Class is now realized with asyncio alone...

import asyncio
import websockets


class TimeTicker:
    is_stopped = False

    def __new__(cls, _loop, _uri, interval=1):
        instance = super().__new__(cls)
        instance.interval = interval
        instance.uri = _uri
        instance.task = _loop.create_task(instance.run())
        return instance.task

    async def run(self):
        async with websockets.connect(self.uri) as self.ws:
            while True:
                await self.do()
                await asyncio.sleep(self.interval)

    async def do(self):
        message = 'ping'
        await self.ws.send(message)
        r = await self.ws.recv()
        print(r)

    def stop(self):
        self.task.cancel()
        self.is_stopped = True

uri = "ws://echo.websocket.org"
loop = asyncio.get_event_loop()
task = TimeTicker(loop, uri, interval=5)
loop.run_until_complete(task)

Upvotes: 1

Views: 2152

Answers (1)

wowkin2
wowkin2

Reputation: 6355

You don't need threading modules if you are using asyncio.

Here is an example how to do something periodically using asyncio. The only thing - you'll need to have variable with always open connection without context manager.

import asyncio


class TimeTicker:
    is_stopped = False

    def __new__(cls, _loop, _ws, interval=1):
        instance = super().__new__(cls)
        instance.interval = interval
        instance.ws = _ws
        instance.task = _loop.create_task(instance.run())
        return instance.task

    async def run(self):
        while True:
            await self.do()
            await asyncio.sleep(self.interval)

    async def do(self):
        message = 'ping'
        await self.ws.send(message)
        r = await self.ws.recv()
        print(r)

    def stop(self):
        self.task.cancel()
        self.is_stopped = True


uri = "ws://echo.websocket.org"
ws = websockets.connect(uri)

loop = asyncio.get_event_loop()
task = TimeTicker(loop, ws, interval=5)
loop.run_until_complete(task)

Upvotes: 3

Related Questions