Reputation: 3119
Let's say you have a hardware device that sends you updates at more or less random intervals and that a client wants to receive events every time this happens. How would you write a PyTango (The Python wrapper for the Tango controls library PyTango.server.Device
class that simulates this using a thread that pushes new attribute values?
Upvotes: 2
Views: 744
Reputation: 3119
The answer seems to be
PyTango.server.Device
method set_change_event()
to tell Tango that the device is taking care of its own change event pushes, without needing to use a polling loop.Pytango.sever.Device
method push_change_event()
from the update thread. Seems thread-safe so far, have not seen any malarkey even with very high update rates. Would be nice if someone could confirm that.Herewith an example (python 2.7) server using the externally updated randomnumber
attribute
import time
import threading
import random
from PyTango.server import server_run
from PyTango.server import Device, DeviceMeta
from PyTango.server import attribute, command
from PyTango import AttrQuality
class ExternallyUpdated(Device):
__metaclass__ = DeviceMeta
def __init__(self, *args, **kwargs):
super(ExternallyUpdated, self).__init__(*args, **kwargs)
self._randomnumber = (0, 0, AttrQuality.ATTR_VALID)
# Tell Tango that we don't need a polling loop, we'll
# push change events explicitly
self.set_change_event('randomnumber', True)
@attribute(label="Random Number", dtype=int,
# Enables update events for absolute changes >= 1
abs_change=1)
def randomnumber(self):
return self._randomnumber
def init_device(self):
super(ExternallyUpdated, self).init_device()
self.t = threading.Thread(target=self.update_loop)
self.t.setDaemon(True)
self.t.start()
def update_loop(self):
while True:
try:
new_number = random.randint(0, 10000)
ts = time.time()
sleeptime = random.random()*10
print ('Timestamp: {:.5f} New value: {} sleeptime: {}'
.format(ts, new_number, sleeptime))
# Need to cache the value so that clients can poll the attribute
self._randomnumber = (new_number, ts, AttrQuality.ATTR_VALID)
self.push_change_event(
'randomnumber', new_number, ts, AttrQuality.ATTR_VALID)
time.sleep(sleeptime)
except Exception:
logger.exception('Exception in update loop')
time.sleep(1)
if __name__ == "__main__":
server_run([ExternallyUpdated])
And an example client below, assuming you exported the device as so_example/external/1
. It should print a message every time randomnumber is updated.
import time
import logging
import PyTango
logger = logging.getLogger()
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(module)s - '
'%(pathname)s : %(lineno)d - %(message)s',
level=logging.INFO)
device_name = 'so_example/external/1'
td = PyTango.DeviceProxy(device_name)
attr_name = 'randomnumber'
# Set up a listener
def printer(event_data):
try:
print event_data # A PyTango.EventData instance
except Exception:
# Not handling exceptions seems to break the event update loop. Or I was
# doing something else stupid, must still investigate :)
logger.exception('Exception while handling event, event_data: {}'
.format(event_data))
poll_event_id = td.subscribe_event(
attr_name, PyTango.EventType.CHANGE_EVENT, printer)
# Do something that blocks here, or just run in an interactive session
# time.sleep(60)
# This is how you would unsubscribe from the events.
#td.unsubscribe_event(poll_event_id)
Upvotes: 2