Reputation: 24134
I'm using Autobahn to connect to a websocket like this.
class MyComponent(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("session ready")
def oncounter(*args, **args2):
print("event received: args: {} args2: {}".format(args, args2))
try:
yield self.subscribe(oncounter, u'topic')
print("subscribed to topic")
except Exception as e:
print("could not subscribe to topic: {0}".format(e))
if __name__ == '__main__':
addr = u"wss://mywebsocketaddress.com"
runner = ApplicationRunner(url=addr, realm=u"realm1", debug=False, debug_app=False)
runner.run(MyComponent)
This works great, and I am able to receive messages. However, after around 3-4 hours, sometimes much sooner, the messages abruptly stop coming. It appears that the websocket times out (does that happen?), possibly due to connection issues.
How can I auto-reconnect with autobahn when that happens?
Here's my attempt, but the reconnecting code is never called.
class MyClientFactory(ReconnectingClientFactory, WampWebSocketClientFactory):
maxDelay = 10
maxRetries = 5
def startedConnecting(self, connector):
print('Started to connect.')
def clientConnectionLost(self, connector, reason):
print('Lost connection. Reason: {}'.format(reason))
ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
def clientConnectionFailed(self, connector, reason):
print('Connection failed. Reason: {}'.format(reason))
ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
class MyApplicationRunner(object):
log = txaio.make_logger()
def __init__(self, url, realm, extra=None, serializers=None,
debug=False, debug_app=False,
ssl=None, proxy=None):
assert(type(url) == six.text_type)
assert(realm is None or type(realm) == six.text_type)
assert(extra is None or type(extra) == dict)
assert(proxy is None or type(proxy) == dict)
self.url = url
self.realm = realm
self.extra = extra or dict()
self.serializers = serializers
self.debug = debug
self.debug_app = debug_app
self.ssl = ssl
self.proxy = proxy
def run(self, make, start_reactor=True):
if start_reactor:
# only select framework, set loop and start logging when we are asked
# start the reactor - otherwise we are running in a program that likely
# already tool care of all this.
from twisted.internet import reactor
txaio.use_twisted()
txaio.config.loop = reactor
if self.debug or self.debug_app:
txaio.start_logging(level='debug')
else:
txaio.start_logging(level='info')
isSecure, host, port, resource, path, params = parseWsUrl(self.url)
# factory for use ApplicationSession
def create():
cfg = ComponentConfig(self.realm, self.extra)
try:
session = make(cfg)
except Exception as e:
if start_reactor:
# the app component could not be created .. fatal
self.log.error(str(e))
reactor.stop()
else:
# if we didn't start the reactor, it's up to the
# caller to deal with errors
raise
else:
session.debug_app = self.debug_app
return session
# create a WAMP-over-WebSocket transport client factory
transport_factory = MyClientFactory(create, url=self.url, serializers=self.serializers,
proxy=self.proxy, debug=self.debug)
# supress pointless log noise like
# "Starting factory <autobahn.twisted.websocket.WampWebSocketClientFactory object at 0x2b737b480e10>""
transport_factory.noisy = False
# if user passed ssl= but isn't using isSecure, we'll never
# use the ssl argument which makes no sense.
context_factory = None
if self.ssl is not None:
if not isSecure:
raise RuntimeError(
'ssl= argument value passed to %s conflicts with the "ws:" '
'prefix of the url argument. Did you mean to use "wss:"?' %
self.__class__.__name__)
context_factory = self.ssl
elif isSecure:
from twisted.internet.ssl import optionsForClientTLS
context_factory = optionsForClientTLS(host)
from twisted.internet import reactor
if self.proxy is not None:
from twisted.internet.endpoints import TCP4ClientEndpoint
client = TCP4ClientEndpoint(reactor, self.proxy['host'], self.proxy['port'])
transport_factory.contextFactory = context_factory
elif isSecure:
from twisted.internet.endpoints import SSL4ClientEndpoint
assert context_factory is not None
client = SSL4ClientEndpoint(reactor, host, port, context_factory)
else:
from twisted.internet.endpoints import TCP4ClientEndpoint
client = TCP4ClientEndpoint(reactor, host, port)
d = client.connect(transport_factory)
# as the reactor shuts down, we wish to wait until we've sent
# out our "Goodbye" message; leave() returns a Deferred that
# fires when the transport gets to STATE_CLOSED
def cleanup(proto):
if hasattr(proto, '_session') and proto._session is not None:
if proto._session.is_attached():
return proto._session.leave()
elif proto._session.is_connected():
return proto._session.disconnect()
# when our proto was created and connected, make sure it's cleaned
# up properly later on when the reactor shuts down for whatever reason
def init_proto(proto):
reactor.addSystemEventTrigger('before', 'shutdown', cleanup, proto)
return proto
# if we connect successfully, the arg is a WampWebSocketClientProtocol
d.addCallback(init_proto)
# if the user didn't ask us to start the reactor, then they
# get to deal with any connect errors themselves.
if start_reactor:
# if an error happens in the connect(), we save the underlying
# exception so that after the event-loop exits we can re-raise
# it to the caller.
class ErrorCollector(object):
exception = None
def __call__(self, failure):
self.exception = failure.value
reactor.stop()
connect_error = ErrorCollector()
d.addErrback(connect_error)
# now enter the Twisted reactor loop
reactor.run()
# if we exited due to a connection error, raise that to the
# caller
if connect_error.exception:
raise connect_error.exception
else:
# let the caller handle any errors
return d
The error I'm getting:
2016-10-09T21:00:40+0100 Connection to/from tcp4:xxx.xx.xx.xx:xxx was lost in a non-clean fashion: Connection lost 2016-10-09T21:00:40+0100 _connectionLost: [Failure instance: Traceback (failure with no frames): : Connection to the other side was lost in a non-clean fashion: Connection l ost. ] 2016-10-09T21:00:40+0100 WAMP-over-WebSocket transport lost: wasClean=False, code=1006, reason="connection was closed uncleanly (peer dropped the TCP connection without previous WebSocket closing handshake)" 2016-10-09T21:10:39+0100 EXCEPTION: no messages received 2016-10-09T21:10:39+0100 Traceback (most recent call last):
Upvotes: 4
Views: 4564
Reputation: 4797
Have you tried pip3 install autobahn-autoreconnect
?
# from autobahn.asyncio.wamp import ApplicationRunner
from autobahn_autoreconnect import ApplicationRunner
Upvotes: 1
Reputation: 22051
Here is an example of an automatically reconnecting ApplicationRunner. The important line to enable auto-reconnect is:
runner.run(session, auto_reconnect=True)
You will also want to activate automatic WebSocket ping/pong (eg in Crossbar.io), to a) minimize connection drops because of timeouts, and b) to allow fast detection of lost connections.
Upvotes: 0
Reputation: 5107
You can use a ReconnectingClientFactory
if you're using Twisted. There's a simple example by the autobahn developer on github. Unfortunately there doesn't seem to be an implementation of an ApplicationRunner
that comes pre-built with this functionality but it doesn't look too difficult to implement on your own. Here's an asyncio
variant which should be straight forward to Twisted. You could follow this issue as it seems there is a desire by the dev team to incorporate reconnecting client.
Upvotes: 1