siamii
siamii

Reputation: 24134

how to reconnect after autobahn websocket timeout?

I'm using Autobahn to connect to a websocket like this.

class MyComponent(ApplicationSession):

  @inlineCallbacks
  def onJoin(self, details):
    print("session ready")

    def oncounter(*args, **args2):
        print("event received: args: {} args2: {}".format(args, args2))

    try:
        yield self.subscribe(oncounter, u'topic')
        print("subscribed to topic")
    except Exception as e:
        print("could not subscribe to topic: {0}".format(e))

if __name__ == '__main__':
  addr = u"wss://mywebsocketaddress.com"
  runner = ApplicationRunner(url=addr, realm=u"realm1", debug=False, debug_app=False)
  runner.run(MyComponent)

This works great, and I am able to receive messages. However, after around 3-4 hours, sometimes much sooner, the messages abruptly stop coming. It appears that the websocket times out (does that happen?), possibly due to connection issues.

How can I auto-reconnect with autobahn when that happens?


Here's my attempt, but the reconnecting code is never called.

class MyClientFactory(ReconnectingClientFactory, WampWebSocketClientFactory):

    maxDelay = 10
    maxRetries = 5

    def startedConnecting(self, connector):
        print('Started to connect.')

    def clientConnectionLost(self, connector, reason):
        print('Lost connection. Reason: {}'.format(reason))
        ReconnectingClientFactory.clientConnectionLost(self, connector, reason)

    def clientConnectionFailed(self, connector, reason):
        print('Connection failed. Reason: {}'.format(reason))
        ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)

class MyApplicationRunner(object):

    log = txaio.make_logger()

    def __init__(self, url, realm, extra=None, serializers=None,
                 debug=False, debug_app=False,
                 ssl=None, proxy=None):

        assert(type(url) == six.text_type)
        assert(realm is None or type(realm) == six.text_type)
        assert(extra is None or type(extra) == dict)
        assert(proxy is None or type(proxy) == dict)
        self.url = url
        self.realm = realm
        self.extra = extra or dict()
        self.serializers = serializers
        self.debug = debug
        self.debug_app = debug_app
        self.ssl = ssl
        self.proxy = proxy

    def run(self, make, start_reactor=True):
        if start_reactor:
            # only select framework, set loop and start logging when we are asked
            # start the reactor - otherwise we are running in a program that likely
            # already tool care of all this.
            from twisted.internet import reactor
            txaio.use_twisted()
            txaio.config.loop = reactor

            if self.debug or self.debug_app:
                txaio.start_logging(level='debug')
            else:
                txaio.start_logging(level='info')

        isSecure, host, port, resource, path, params = parseWsUrl(self.url)

        # factory for use ApplicationSession
        def create():
            cfg = ComponentConfig(self.realm, self.extra)
            try:
                session = make(cfg)
            except Exception as e:
                if start_reactor:
                    # the app component could not be created .. fatal
                    self.log.error(str(e))
                    reactor.stop()
                else:
                    # if we didn't start the reactor, it's up to the
                    # caller to deal with errors
                    raise
            else:
                session.debug_app = self.debug_app
                return session

        # create a WAMP-over-WebSocket transport client factory
        transport_factory = MyClientFactory(create, url=self.url, serializers=self.serializers,
                                                       proxy=self.proxy, debug=self.debug)

        # supress pointless log noise like
        # "Starting factory <autobahn.twisted.websocket.WampWebSocketClientFactory object at 0x2b737b480e10>""
        transport_factory.noisy = False

        # if user passed ssl= but isn't using isSecure, we'll never
        # use the ssl argument which makes no sense.
        context_factory = None
        if self.ssl is not None:
            if not isSecure:
                raise RuntimeError(
                    'ssl= argument value passed to %s conflicts with the "ws:" '
                    'prefix of the url argument. Did you mean to use "wss:"?' %
                    self.__class__.__name__)
            context_factory = self.ssl
        elif isSecure:
            from twisted.internet.ssl import optionsForClientTLS
            context_factory = optionsForClientTLS(host)

        from twisted.internet import reactor
        if self.proxy is not None:
            from twisted.internet.endpoints import TCP4ClientEndpoint
            client = TCP4ClientEndpoint(reactor, self.proxy['host'], self.proxy['port'])
            transport_factory.contextFactory = context_factory
        elif isSecure:
            from twisted.internet.endpoints import SSL4ClientEndpoint
            assert context_factory is not None
            client = SSL4ClientEndpoint(reactor, host, port, context_factory)
        else:
            from twisted.internet.endpoints import TCP4ClientEndpoint
            client = TCP4ClientEndpoint(reactor, host, port)

        d = client.connect(transport_factory)

        # as the reactor shuts down, we wish to wait until we've sent
        # out our "Goodbye" message; leave() returns a Deferred that
        # fires when the transport gets to STATE_CLOSED
        def cleanup(proto):
            if hasattr(proto, '_session') and proto._session is not None:
                if proto._session.is_attached():
                    return proto._session.leave()
                elif proto._session.is_connected():
                    return proto._session.disconnect()

        # when our proto was created and connected, make sure it's cleaned
        # up properly later on when the reactor shuts down for whatever reason
        def init_proto(proto):
            reactor.addSystemEventTrigger('before', 'shutdown', cleanup, proto)
            return proto

        # if we connect successfully, the arg is a WampWebSocketClientProtocol
        d.addCallback(init_proto)

        # if the user didn't ask us to start the reactor, then they
        # get to deal with any connect errors themselves.
        if start_reactor:
            # if an error happens in the connect(), we save the underlying
            # exception so that after the event-loop exits we can re-raise
            # it to the caller.

            class ErrorCollector(object):
                exception = None

                def __call__(self, failure):
                    self.exception = failure.value
                    reactor.stop()
            connect_error = ErrorCollector()
            d.addErrback(connect_error)

            # now enter the Twisted reactor loop
            reactor.run()

            # if we exited due to a connection error, raise that to the
            # caller
            if connect_error.exception:
                raise connect_error.exception

        else:
            # let the caller handle any errors
            return d

The error I'm getting:

2016-10-09T21:00:40+0100 Connection to/from tcp4:xxx.xx.xx.xx:xxx was lost in a non-clean fashion: Connection lost
2016-10-09T21:00:40+0100 _connectionLost: [Failure instance: Traceback (failure with no frames): : Connection to the other side was lost in a non-clean fashion: Connection l
ost.
]
2016-10-09T21:00:40+0100 WAMP-over-WebSocket transport lost: wasClean=False, code=1006, reason="connection was closed uncleanly (peer dropped the TCP connection without previous WebSocket closing handshake)"
2016-10-09T21:10:39+0100 EXCEPTION:  no messages received
2016-10-09T21:10:39+0100 Traceback (most recent call last):

Upvotes: 4

Views: 4564

Answers (3)

sunwarr10r
sunwarr10r

Reputation: 4797

Have you tried pip3 install autobahn-autoreconnect ?

# from autobahn.asyncio.wamp import ApplicationRunner
from autobahn_autoreconnect import ApplicationRunner

Upvotes: 1

oberstet
oberstet

Reputation: 22051

Here is an example of an automatically reconnecting ApplicationRunner. The important line to enable auto-reconnect is:

runner.run(session, auto_reconnect=True)

You will also want to activate automatic WebSocket ping/pong (eg in Crossbar.io), to a) minimize connection drops because of timeouts, and b) to allow fast detection of lost connections.

Upvotes: 0

notorious.no
notorious.no

Reputation: 5107

You can use a ReconnectingClientFactory if you're using Twisted. There's a simple example by the autobahn developer on github. Unfortunately there doesn't seem to be an implementation of an ApplicationRunner that comes pre-built with this functionality but it doesn't look too difficult to implement on your own. Here's an asyncio variant which should be straight forward to Twisted. You could follow this issue as it seems there is a desire by the dev team to incorporate reconnecting client.

Upvotes: 1

Related Questions