Reputation: 513
I'm trying o thread the following code and send data to it (at random intervals) but I can't figure out how. I'm saving all the data to a txt file and reading the info from there, it isn't working very well. Is it possible to create a function that sends data to a specific thread( like : SendDataToThread(data, ThreadNumber) )? and how would I go about reading the data sent? I've seen a few solutions using queue but I was unable to understand them. here is the script I am temporarily using to plot the graph which I found here. sorry if the question seems simple but I've never before had to messed with threading or matplotlib.
import matplotlib.pyplot as plt
from threading import Thread
plt.ion()
class DynamicUpdate():
#Suppose we know the x range
min_x = 0
max_x = 10
def on_launch(self):
#Set up plot
self.figure, self.ax = plt.subplots()
self.lines, = self.ax.plot([],[], 'o')
#Autoscale on unknown axis and known lims on the other
self.ax.set_autoscaley_on(True)
self.ax.set_xlim(self.min_x, self.max_x)
#Other stuff
self.ax.grid()
...
def on_running(self, xdata, ydata):
#Update data (with the new _and_ the old points)
self.lines.set_xdata(xdata)
self.lines.set_ydata(ydata)
#Need both of these in order to rescale
self.ax.relim()
self.ax.autoscale_view()
#We need to draw *and* flush
self.figure.canvas.draw()
self.figure.canvas.flush_events()
#Example
def __call__(self):
# read/plot data
Upvotes: 2
Views: 2000
Reputation: 2847
Here's some example code which shows how to do several of the things that were asked about. This uses multithreading rather than multiprocessing, and shows some examples of using queues, starting/stopping worker threads and updating a matplotlib plot with additional data.
(Part of the code comes from answers to other questions including this one and this one.)
The code shows a possible implementation of an asynchronous worker, to which data can be sent for subsequent processing. The worker uses an internal queue to buffer the data, and an internal thread (loop) that reads data from the queue, does some processing and sends the result for display.
An asynchronous plotter implementation is also shown. Results can be sent to this plotter from multiple workers. (This also uses an internal queue for buffering; this is done to allow the main program thread itself to call the function that updates the plot, which appears to be a requirement with matplotlib.)
NB This was written for Python 2.7 on OSX. Hope some of it may be useful.
import time
import threading
import Queue
import math
import matplotlib.pyplot as plt
class AsynchronousPlotter:
"""
Updates a matplotlib data plot asynchronously.
Uses an internal queue to buffer results passed for plotting in x, y pairs.
NB the output_queued_results() function is intended be called periodically
from the main program thread, to update the plot with any waiting results.
"""
def output_queued_results(self):
"""
Plots any waiting results. Should be called from main program thread.
Items for display are x, y pairs
"""
while not self.queue.empty():
item = self.queue.get()
x, y = item
self.add_point(x, y)
self.queue.task_done()
def queue_result_for_output(self, x, y):
"""
Queues an x, y pair for display. Called from worker threads, so intended
to be thread safe.
"""
self.lock.acquire(True)
self.queue.put([x, y])
self.lock.release()
def redraw(self):
self.ax.relim()
self.ax.autoscale_view()
self.fig.canvas.draw()
plt.pause(0.001)
def add_point(self, x, y):
self.xdata.append(x)
self.ydata.append(y)
self.lines.set_xdata(self.xdata)
self.lines.set_ydata(self.ydata)
self.redraw()
def __init__(self):
self.xdata=[]
self.ydata=[]
self.fig = plt.figure()
self.ax = self.fig.add_subplot(111)
self.lines, = self.ax.plot(self.xdata, self.ydata, 'o')
self.ax.set_autoscalex_on(True)
self.ax.set_autoscaley_on(True)
plt.ion()
plt.show()
self.lock = threading.Lock()
self.queue = Queue.Queue()
class AsynchronousWorker:
"""
Processes data asynchronously.
Uses an internal queue and internal thread to handle data passed in.
Does some processing on the data in the internal thread, and then
sends result to an asynchronous plotter for display
"""
def queue_data_for_processing(self, raw_data):
"""
Queues data for processing by the internal thread.
"""
self.queue.put(raw_data)
def _worker_loop(self):
"""
The internal thread loop. Runs until the exit signal is set.
Processes the supplied raw data into something ready
for display.
"""
while True:
try:
# check for any data waiting in the queue
raw_data = self.queue.get(True, 1)
# process the raw data, and send for display
# in this trivial example, change circle radius -> area
x, y = raw_data
y = y**2 * math.pi
self.ap.queue_result_for_output(x, y)
self.queue.task_done()
except Queue.Empty:
pass
finally:
if self.esig.is_set():
return
def hang_up(self):
self.esig.set() # set the exit signal...
self.loop.join() # ... and wait for thread to exit
def __init__(self, ident, ap):
self.ident = ident
self.ap = ap
self.esig = threading.Event()
self.queue = Queue.Queue()
self.loop = threading.Thread(target=self._worker_loop)
self.loop.start()
if __name__ == "__main__":
ap = AsynchronousPlotter()
num_workers = 5 # use this many workers
# create some workers. Give each worker some ID and tell it
# where it can find the output plotter
workers = []
for worker_number in range (num_workers):
workers.append(AsynchronousWorker(worker_number, ap))
# supply some data to the workers
for worker_number in range (num_workers):
circle_number = worker_number
circle_radius = worker_number * 4
workers[worker_number].queue_data_for_processing([circle_number, circle_radius])
# wait for workers to finish then tell the plotter to plot the results
# in a longer-running example we would update the plot every few seconds
time.sleep(2)
ap.output_queued_results();
# Wait for user to hit return, and clean up workers
raw_input("Hit Return...")
for worker in workers:
worker.hang_up()
Upvotes: 2
Reputation: 513
I kinda improved the code I can send a value to it when it is being created so that is good, but with multiprocessing I can't really figure out how to make the plot show. When I call the plot without multiprocessing it works so it might be something simple that I can't see. Also I'm trying to study the code you left a link to but to me, it's not very clear. I'm also trying to save the processes to a list so that later I can try to send the data directly to the process while the process is running(I think it's with pipe that I do this but, I'm not sure)
import matplotlib.pyplot as plt
from multiprocessing import Process
plt.ion()
class DynamicUpdate():
#Suppose we know the x range
min_x = 0
max_x = 10
def __init__(self, x):
self.number = x
def on_launch(self):
#Set up plot
self.figure, self.ax = plt.subplots()
self.lines, = self.ax.plot([],[], 'o')
#Autoscale on unknown axis and known lims on the other
self.ax.set_autoscaley_on(True)
self.ax.set_xlim(self.min_x, self.max_x)
#Other stuff
self.ax.grid()
...
def on_running(self, xdata, ydata):
#Update data (with the new _and_ the old points)
self.lines.set_xdata(xdata)
self.lines.set_ydata(ydata)
#Need both of these in order to rescale
self.ax.relim()
self.ax.autoscale_view()
#We need to draw *and* flush
self.figure.canvas.draw()
self.figure.canvas.flush_events()
#Example
def __call__(self):
print(self.number)
import numpy as np
import time
self.on_launch()
xdata = []
ydata = []
for x in np.arange(0,10,0.5):
xdata.append(x)
ydata.append(np.exp(-x**2)+10*np.exp(-(x-7)**2))
self.on_running(xdata, ydata)
time.sleep(1)
return xdata, ydata
_processes_=[]
for i in range(0,2):
_processes_.append(Process(target=DynamicUpdate(i)))
p = Process(target=_processes_[i])
p.start()
# tried adding p.join(), but it didn't change anything
p.join()
Upvotes: 0