Dr. Paprika
Dr. Paprika

Reputation: 154

Architecture for data acquisition and processing

I am sharping up my Python skills and have started learning about websockets as an educational tool. Therefore, I'm working with real-time data received every millisecond via a websocket. I would like to seperate its acquisition/processing/plotting in a clean and comprehensive way. Acquisition and processing are critical, whereas plotting can be updated every ~100ms.

A) I am assuming that the raw data arrives at a constant rate, every ms.

B) If processing isn't quick enough (>1ms), skip the data that arrived while busy and stay synced with A)

C) Every ~100ms or so, get the last processed data and plot it.

I guess that a Minimal Working Example would start like this:

import threading

class ReceiveData(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def receive(self):
        pass


class ProcessData(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def process(self):
        pass


class PlotData(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def plot(self):
        pass

Starting with this (is it even the right way to go ?), how can I pass the raw data from ReceiveData to ProcessData, and periodically to PlotData ? How can I keep the executions synced, and repeat calls every ms or 100ms ?

Thank you.

Upvotes: 0

Views: 299

Answers (1)

Gerd
Gerd

Reputation: 2813

I think your general approach with threads for receiving and processing the data is fine. For communication between the threads, I would suggest a producer-consumer approach. Here is a complete example using a Queue as a data structure.

In your case you want to skip unprocessed data and use only the most recent element. For achieving this, collections.deque (see documentation) might be a better choice for you - see also this discussion.

d = collections.deque(maxlen=1)

The producer side would then append data to the deque like this:

d.append(item)

And the main loop on the consumer side might look like this:

while True:
    try:
        item = d.pop()
        print('Getting item' + str(item))
    except IndexError:
        print('Deque is empty')
    # time.sleep(s) if you want to poll the latest data every s seconds

Possibly, you can merge the ReceiveData and ProcessData functionalities into just one class / thread and use only one deque between this class and PlotData.

Upvotes: 1

Related Questions