ASuitee
ASuitee

Reputation: 11

Can't send message through autobahn(twisted) websocket from an other thread

I am trying to use a Autobahn Twisted websocket with a ROS node. If they are both used in the main thread, ROS and twisted seemed to block each other so I chose to run my ROS node in a separate thread.

I need to send a message through the socket from this ros thread. I use the reactor.callFromThread method in order to achieve this.

However I get this error when the function is called:

Traceback (most recent call last):
  File "/home/sco10/ROS/sco10_ws/install/s_teleop/lib/s_teleop/external_com.py", line 220, in <module>
    main()
  File "/home/sco10/ROS/sco10_ws/install/s_teleop/lib/s_teleop/external_com.py", line 212, in main
    reactor.run()
  File "/usr/lib/python3/dist-packages/twisted/internet/base.py", line 1267, in run
    self.mainLoop()
  File "/usr/lib/python3/dist-packages/twisted/internet/base.py", line 1276, in mainLoop
    self.runUntilCurrent()
--- <exception caught here> ---
  File "/usr/lib/python3/dist-packages/twisted/internet/base.py", line 875, in runUntilCurrent
    f(*a, **kw)
  File "/usr/lib/python3/dist-packages/autobahn/websocket/protocol.py", line 2166, in sendMessage
    if self.state != WebSocketProtocol.STATE_OPEN:
builtins.AttributeError: type object 'WebSocketServerProtocol' has no attribute 'state'

Here is my code:

q = queue.Queue()
qr = queue.Queue()


class MyServerProtocol(WebSocketServerProtocol):

    def __init__(self):
        super(MyServerProtocol, self).__init__()


    def onConnect(self, request):
        print("Client connecting: {}".format(request.peer))

    def onOpen(self):
        print("WebSocket connection open.")

    def onMessage(self, payload, isBinary):
        try:
            obj = json.loads(payload.decode('utf8'))
            q.put(obj)
            resp = qr.get(timeout=1)
            payload = resp.encode('utf8')
            self.sendMessage(payload, isBinary=False)
        except:
            print("Unable to convert to json")

    def onClose(self, wasClean, code, reason):
        print("WebSocket connection closed: {}".format(reason))
        
        
class MyServerProtocolFactory(WebSocketServerFactory):

    def __init__(self):
        super(MyServerProtocolFactory, self).__init__()

    def buildProtocol(self, *args, **kwargs):
        protocol = MyServerProtocol()
        protocol.factory = self
        return protocol



class RosNode(Node):

    def send_data(self, msg, name, msg_type):
        if msg_type == "service response":
            qr.put(msg)
        if msg_type == "subscriber":
            reactor.callFromThread(self.factory.protocol.sendMessage, self.factory.protocol, msg.encode('utf8'))

    def receive_data(self):
        msg = q.get()
        # Process msg hereq = queue.Queue()
qr = queue.Queue()


class MyServerProtocol(WebSocketServerProtocol):

    def __init__(self):
        super(MyServerProtocol, self).__init__()


    def onConnect(self, request):
        print("Client connecting: {}".format(request.peer))

    def onOpen(self):
        print("WebSocket connection open.")

    def onMessage(self, payload, isBinary):
        try:
            obj = json.loads(payload.decode('utf8'))
            q.put(obj)
            resp = qr.get(timeout=1)
            payload = resp.encode('utf8')
            self.sendMessage(payload, isBinary=False)
        except:
            print("Unable to convert to json")

    def onClose(self, wasClean, code, reason):
        print("WebSocket connection closed: {}".format(reason))
        
        
class MyServerProtocolFactory(WebSocketServerFactory):

    def __init__(self):
        super(MyServerProtocolFactory, self).__init__()

    def buildProtocol(self, *args, **kwargs):
        protocol = SuiteeServerProtocol()
        protocol.factory = self
        return protocol



class RosNode(Node):

    def send_data(self, msg, name, msg_type):
        if msg_type == "service response":
            qr.put(msg)
        if msg_type == "subscriber":
            reactor.callFromThread(self.factory.protocol.sendMessage, self.factory.protocol, msg.encode('utf8'))

    def receive_data(self):
        msg = q.get()
        # Process msg here
    
    def callback(self, msg):
        # Process msg coming from ros 
        self.send_data(msg, name, msg_type)

def ros_tread(node):
    while rclpy.ok():
        rclpy.spin_once(node)



def main(args=None):
    rclpy.init(args=sys.argv)
    factory = MyServerProtocolFactory()
    myContextFactory = ssl.DefaultOpenSSLContextFactory('cert.key',
                                                        'cert.crt')
    ctx = myContextFactory.getContext()
    # certificate = ssl.PrivateCertificate.load(certData, keyData,format=crypto.FILETYPE_PEM)
    rn = RosNode(factory)
    t = threading.Thread(target=ros_tread, args=(rn,))
    t.start()

    reactor.listenSSL(9000, factory, myContextFactory)
    reactor.run()

    st.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    """ main """
    main()

I don't understand why i get this error as WebSocketServerProtocol is supposed to inherit from WebSocketProtocol that contains a "state" attribute.

Anyone as some clue on this ?

Upvotes: 1

Views: 187

Answers (0)

Related Questions