Reputation: 85
I'm working on Pepper from SoftBank Robotics and I'm trying to realize a program that is able to integrate a callback function once a "Navigation/AvoidanceNavigator/ObstacleDetected" event goes TRUE.
Of course, I tried to refer the manual but in the Reacting to Events section of the Python SDK there is only an example that uses the FaceDetect event.
Unfortunately, differently from FaceDetect event, which is part of the ALFaceDetection service that has a subscribe method, "Navigation/AvoidanceNavigator/ObstacleDetected" event is associated with ALNavigation API that doesn't have the subscriber.
Someone can help me with this?
Here my first (not working) idea:
#! /usr/bin/env python
# -*- encoding: UTF-8 -*-
from naoqi import ALProxy
import qi
import argparse
import sys
import numpy
from PIL import Image
import os
import time
class tryGetEvent(object):
def __init__(self, app):
super(tryGetEvent, self).__init__()
app.start() #start the application (see qi.ApplicationAPI)
session=app.session #return the current session
# Open the service ALMemory
self.memory=session.service("ALMemory")
#Connect to the event callback
self.subscriber=self.memory.subscriber("Navigation/AvoidanceNavigator/ObstacleDetected")
self.subscriber.signal.connect(self.obstacleDet)
self.tts=session.service("ALTextToSpeech")
self.navigation=session.service("ALNavigation")
self.navigation.subscribe("tryGetEvent")
self.got_obst=False
def obstacleDet(self, position):
if position==[]: #empty value
self.got_obst=False
elif not self.got_obst:
self.got_obst=True
print "Obstacle detected!"
self.tts.say("Obstacle detected!")
pos_plot=position [1]
print (pos_plot)
def run(self):
print("Starting script tryGetEvent")
try:
while True:
time.sleep(0.2)
except KeyboardInterrupt:
print("user interrupted the script")
self.navigation.unsubscribe("tryGetEvent")
sys.exit(0)
# DO NOT TOUCH !
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--ip", type=str, default="192.168.1.32",
help="Robot IP address. On robot or Local Naoqi: use '127.0.0.1'.")
parser.add_argument("--port", type=int, default=9559,
help="Naoqi port number")
args = parser.parse_args()
try:
# Initialize qi framework.
connection_url = "tcp://" + args.ip + ":" + str(args.port)
app = qi.Application(["tryGetEvent", "--qi-url=" + connection_url])
except RuntimeError:
print ("Can't connect to Naoqi at ip \"" + args.ip + "\" on port " + str(args.port) +".\n"
"Please check your script arguments. Run with -h option for help.")
sys.exit(1)
Get_Event=tryGetEvent(app)
Get_Event.run()
Upvotes: 1
Views: 545
Reputation: 2342
I am sorry to see that the manual's example is unclear, but there is a simpler way to write this code.
obstacle_tracker.py
:
#! /usr/bin/env python
# -*- encoding: UTF-8 -*-
import qi
class ObstacleTracker:
def __init__(self, session):
self.session = session
self.got_obst = False
memory = session.service("ALMemory")
subscriber = memory.subscriber("Navigation/AvoidanceNavigator/ObstacleDetected")
subscriber.signal.connect(self.obstacleDet)
def obstacleDet(self, position):
if position == []: #empty value
self.got_obst = False
elif not self.got_obst:
self.got_obst = True
print("Obstacle detected!")
tts = self.session.service("ALTextToSpeech")
tts.say("Obstacle detected!")
print(position)
if __name__ == "__main__":
app = qi.Application()
app.start()
obstacle_tracker = ObstacleTracker(app.session)
app.run()
Run it with:
$ python obstacle_tracker.py --qi-url=192.168.1.32
Upvotes: 0