user169808
user169808

Reputation: 513

Sending data to a thread in python

I'm trying o thread the following code and send data to it (at random intervals) but I can't figure out how. I'm saving all the data to a txt file and reading the info from there, it isn't working very well. Is it possible to create a function that sends data to a specific thread( like : SendDataToThread(data, ThreadNumber) )? and how would I go about reading the data sent? I've seen a few solutions using queue but I was unable to understand them. here is the script I am temporarily using to plot the graph which I found here. sorry if the question seems simple but I've never before had to messed with threading or matplotlib.

import matplotlib.pyplot as plt
from threading import Thread
plt.ion()
class DynamicUpdate():
    #Suppose we know the x range
    min_x = 0
    max_x = 10

    def on_launch(self):
        #Set up plot
        self.figure, self.ax = plt.subplots()
        self.lines, = self.ax.plot([],[], 'o')
        #Autoscale on unknown axis and known lims on the other
        self.ax.set_autoscaley_on(True)
        self.ax.set_xlim(self.min_x, self.max_x)
        #Other stuff
        self.ax.grid()
        ...

    def on_running(self, xdata, ydata):
        #Update data (with the new _and_ the old points)
        self.lines.set_xdata(xdata)
        self.lines.set_ydata(ydata)
        #Need both of these in order to rescale
        self.ax.relim()
        self.ax.autoscale_view()
        #We need to draw *and* flush
        self.figure.canvas.draw()
        self.figure.canvas.flush_events()
    #Example 
    def __call__(self):
        # read/plot data

Upvotes: 2

Views: 2000

Answers (2)

AS Mackay
AS Mackay

Reputation: 2847

Here's some example code which shows how to do several of the things that were asked about. This uses multithreading rather than multiprocessing, and shows some examples of using queues, starting/stopping worker threads and updating a matplotlib plot with additional data.

(Part of the code comes from answers to other questions including this one and this one.)

The code shows a possible implementation of an asynchronous worker, to which data can be sent for subsequent processing. The worker uses an internal queue to buffer the data, and an internal thread (loop) that reads data from the queue, does some processing and sends the result for display.

An asynchronous plotter implementation is also shown. Results can be sent to this plotter from multiple workers. (This also uses an internal queue for buffering; this is done to allow the main program thread itself to call the function that updates the plot, which appears to be a requirement with matplotlib.)

NB This was written for Python 2.7 on OSX. Hope some of it may be useful.

import time
import threading
import Queue
import math
import matplotlib.pyplot as plt

class AsynchronousPlotter:
    """
    Updates a matplotlib data plot asynchronously. 
    Uses an internal queue to buffer results passed for plotting in x, y pairs.
    NB the output_queued_results() function is intended be called periodically
    from the main program thread, to update the plot with any waiting results.
    """

    def output_queued_results(self):
        """
        Plots any waiting results. Should be called from main program thread.
        Items for display are x, y pairs
        """
        while not self.queue.empty():
            item = self.queue.get()
            x, y = item
            self.add_point(x, y)
            self.queue.task_done()

    def queue_result_for_output(self, x, y):
        """
        Queues an x, y pair for display. Called from worker threads, so intended 
        to be thread safe.
        """
        self.lock.acquire(True)
        self.queue.put([x, y]) 
        self.lock.release()

    def redraw(self):
        self.ax.relim()
        self.ax.autoscale_view()
        self.fig.canvas.draw()
        plt.pause(0.001)

    def add_point(self, x, y):
        self.xdata.append(x)
        self.ydata.append(y)
        self.lines.set_xdata(self.xdata)
        self.lines.set_ydata(self.ydata)
        self.redraw()

    def __init__(self):
        self.xdata=[]
        self.ydata=[]
        self.fig = plt.figure()
        self.ax = self.fig.add_subplot(111)
        self.lines, = self.ax.plot(self.xdata, self.ydata, 'o')
        self.ax.set_autoscalex_on(True)   
        self.ax.set_autoscaley_on(True)   
        plt.ion()
        plt.show()
        self.lock = threading.Lock()
        self.queue = Queue.Queue()

class AsynchronousWorker:
    """
    Processes data asynchronously. 
    Uses an internal queue and internal thread to handle data passed in.
    Does some processing on the data in the internal thread, and then
    sends result to an asynchronous plotter for display
    """

    def queue_data_for_processing(self, raw_data):
        """
        Queues data for processing by the internal thread.
        """
        self.queue.put(raw_data)

    def _worker_loop(self):
        """
        The internal thread loop. Runs until the exit signal is set.
        Processes the supplied raw data into something ready
        for display. 
        """
        while True:
            try:
                # check for any data waiting in the queue
                raw_data = self.queue.get(True, 1)
                # process the raw data, and send for display
                # in this trivial example, change circle radius -> area
                x, y = raw_data
                y = y**2 * math.pi   
                self.ap.queue_result_for_output(x, y)
                self.queue.task_done()
            except Queue.Empty:
                pass
            finally:
                if self.esig.is_set():
                    return

    def hang_up(self):
        self.esig.set()    # set the exit signal...
        self.loop.join()   # ... and wait for thread to exit

    def __init__(self, ident, ap):
        self.ident = ident
        self.ap = ap
        self.esig = threading.Event()
        self.queue = Queue.Queue()
        self.loop = threading.Thread(target=self._worker_loop)
        self.loop.start()

if __name__ == "__main__":     
    ap = AsynchronousPlotter()
    num_workers = 5   # use this many workers

    # create some workers. Give each worker some ID and tell it 
    # where it can find the output plotter
    workers = []
    for worker_number in range (num_workers):
        workers.append(AsynchronousWorker(worker_number, ap)) 

    # supply some data to the workers
    for worker_number in range (num_workers):
        circle_number = worker_number
        circle_radius = worker_number * 4
        workers[worker_number].queue_data_for_processing([circle_number, circle_radius])

    # wait for workers to finish then tell the plotter to plot the results
    # in a longer-running example we would update the plot every few seconds
    time.sleep(2)
    ap.output_queued_results();

    # Wait for user to hit return, and clean up workers
    raw_input("Hit Return...")
    for worker in workers:
        worker.hang_up()

Upvotes: 2

user169808
user169808

Reputation: 513

I kinda improved the code I can send a value to it when it is being created so that is good, but with multiprocessing I can't really figure out how to make the plot show. When I call the plot without multiprocessing it works so it might be something simple that I can't see. Also I'm trying to study the code you left a link to but to me, it's not very clear. I'm also trying to save the processes to a list so that later I can try to send the data directly to the process while the process is running(I think it's with pipe that I do this but, I'm not sure)

import matplotlib.pyplot as plt
from multiprocessing import Process

plt.ion()
class DynamicUpdate():
    #Suppose we know the x range
    min_x = 0
    max_x = 10
    def __init__(self, x):
        self.number = x
        
    def on_launch(self):
        #Set up plot
        self.figure, self.ax = plt.subplots()
        self.lines, = self.ax.plot([],[], 'o')
        #Autoscale on unknown axis and known lims on the other
        self.ax.set_autoscaley_on(True)
        self.ax.set_xlim(self.min_x, self.max_x)
        #Other stuff
        self.ax.grid()
        ...
        
    def on_running(self, xdata, ydata):
        #Update data (with the new _and_ the old points)
        self.lines.set_xdata(xdata)
        self.lines.set_ydata(ydata)
        #Need both of these in order to rescale
        self.ax.relim()
        self.ax.autoscale_view()
        #We need to draw *and* flush
        self.figure.canvas.draw()
        self.figure.canvas.flush_events()

    #Example
    def __call__(self):
        print(self.number)
        
        import numpy as np
        import time
        self.on_launch()
        xdata = []
        ydata = []
        for x in np.arange(0,10,0.5):
            xdata.append(x)
            ydata.append(np.exp(-x**2)+10*np.exp(-(x-7)**2))
            self.on_running(xdata, ydata)
            time.sleep(1)
        return xdata, ydata

_processes_=[]
for i in range(0,2):
    _processes_.append(Process(target=DynamicUpdate(i)))
    p = Process(target=_processes_[i])
    p.start()
    # tried adding p.join(), but it didn't change anything
    p.join()

Upvotes: 0

Related Questions