Reputation: 1244
Currently I am working on a IoT Mockup Client, which reads in a couple of real-world timeseries data CSVs and "replays" them based on their timestamp and the system clock (threading
module).
So basically I have a Pandas Dataframe which holds all the old data and a timer tick handler function which extracts the corresponding rows from it to yield them to any data sink.
This works perfectly if my timer-tick-handler uses requests.post(..
then to post simply a text body gathered from df[...<current-timeslice-filer>...].to_csv()
.
Now I want to stream this data to a server api, therefore we decided to transmit data via Websockets rather than HTTP-REST. There things start to get tricky. The websockets
module relies heavily on asyncio
which needs its own event loop. As my timer is already kind of an event loop (based on threading.timer
) and I must admit that I don't fully understand the concepts around asyncio, I think that this doesn't quite fit together.
At least I do not know how to integrate the websocket.send() method into my handler method so that it is run inside the asyncio event loop.
DataFrame.to_csv(... can be given a file handler file_or_buf
, and I would appreciate it more to use a websocket just like a file handler and provide it here, to flush my data through.
websockets
module be used to achieve this? Am I just getting it wrong?EDIT what I have so far...
Thats my timer class which calls method do()
every interval
seconds
from threading import Thread,Event
class TimeTicker(Thread):
def __init__(self, interval=1):
Thread.__init__(self)
self.interval = interval
self.stopped = Event()
def run(self):
while not self.stopped.wait(self.interval):
self.do()
def do(self):
print('tick')
def get_stopflag(self):
return self.stopped
Now the basic snippet for using websockets
and asyncio
is...
import asyncio
import websockets
async def hello():
uri = "ws://echo.websocket.org"
async with websockets.connect(uri) as websocket:
await websocket.send(thread.stream())
r = await websocket.recv()
print(r)
asyncio.get_event_loop().run_until_complete(hello())
I already tried to make my do()
method async
but I am not able to initialize my TimeTicker
class inside the asyncio event loop, so that method calls are 'awaited'
To keep things clear, I would like to initialize the websocket connection outside the TimeTicker object (it should only provide timeseries data every second and pass it to websocket.send()
. Nevertheless I am not sure where this pass of data should happen then. There might also be a better solution of my TimeTicker class to yield
data every second instead of just calling a method. Anyway I´d like to get advice on this.
Hint: TimeTicker is only a superclass on my datasource class which actually holds the pandas dataframe with approx. 200.000 rows of timeseries data read from CSV as a reservoir to send.
Solution: based on @wowkin2 's answer my TimeTicker Class is now realized with asyncio alone...
import asyncio
import websockets
class TimeTicker:
is_stopped = False
def __new__(cls, _loop, _uri, interval=1):
instance = super().__new__(cls)
instance.interval = interval
instance.uri = _uri
instance.task = _loop.create_task(instance.run())
return instance.task
async def run(self):
async with websockets.connect(self.uri) as self.ws:
while True:
await self.do()
await asyncio.sleep(self.interval)
async def do(self):
message = 'ping'
await self.ws.send(message)
r = await self.ws.recv()
print(r)
def stop(self):
self.task.cancel()
self.is_stopped = True
uri = "ws://echo.websocket.org"
loop = asyncio.get_event_loop()
task = TimeTicker(loop, uri, interval=5)
loop.run_until_complete(task)
Upvotes: 1
Views: 2152
Reputation: 6355
You don't need threading modules if you are using asyncio.
Here is an example how to do something periodically using asyncio. The only thing - you'll need to have variable with always open connection without context manager.
import asyncio
class TimeTicker:
is_stopped = False
def __new__(cls, _loop, _ws, interval=1):
instance = super().__new__(cls)
instance.interval = interval
instance.ws = _ws
instance.task = _loop.create_task(instance.run())
return instance.task
async def run(self):
while True:
await self.do()
await asyncio.sleep(self.interval)
async def do(self):
message = 'ping'
await self.ws.send(message)
r = await self.ws.recv()
print(r)
def stop(self):
self.task.cancel()
self.is_stopped = True
uri = "ws://echo.websocket.org"
ws = websockets.connect(uri)
loop = asyncio.get_event_loop()
task = TimeTicker(loop, ws, interval=5)
loop.run_until_complete(task)
Upvotes: 3