Reputation: 83
I have a Flask web app in which I want to keep a persistent connection to an AWS Neptune graph database. This connection is established as follows:
from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection
neptune_endpt = 'db-instance-x.xxxxxxxxxx.xx-xxxxx-x.neptune.amazonaws.com'
remoteConn = DriverRemoteConnection(f'wss://{neptune_endpt}:8182/gremlin','g')
self.g = traversal().withRemote(remoteConn)
The issue I'm facing is that the connection automatically drops off if left idle, and I cannot find a way to detect if the connection has dropped off (so that I can reconnect by using the code snippet above).
I have seen this similar issue: Gremlin server withRemote connection closed - how to reconnect automatically? however that question has no solution as well. This similar question has no answer either.
I've tried the following two solutions (both of which did not work):
self.g
to do some traversal on my graph, I try to "refresh" the connection, by which I mean this:def _refresh_neptune(self):
try:
self.g = traversal().withRemote(self.conn)
except:
self.conn = DriverRemoteConnection(f'wss://{neptune_endpt}:8182/gremlin','g')
self.g = traversal().withRemote(self.conn)
Here self.conn
was initialized as:
self.conn = DriverRemoteConnection(f'wss://{neptune_endpt}:8182/gremlin','g')
Is there any way to get around this connection error?
Thanks
Update: Added the error message below:
File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/process/traversal.py
", line 58, in toList
return list(iter(self))
File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/process/traversal.py
", line 48, in __next__
self.traversal_strategies.apply_strategies(self)
File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/process/traversal.py
", line 573, in apply_strategies
traversal_strategy.apply(traversal)
File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/remote_connec
tion.py", line 149, in apply
remote_traversal = self.remote_connection.submit(traversal.bytecode)
File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/driver_remote
_connection.py", line 56, in submit
results = result_set.all().result()
File "/usr/lib/python3.6/concurrent/futures/_base.py", line 425, in result
return self.__get_result()
File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/resultset.py"
, line 90, in cb
f.result()
File "/usr/lib/python3.6/concurrent/futures/_base.py", line 425, in result
return self.__get_result()
File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/usr/lib/python3.6/concurrent/futures/thread.py", line 56, in run
result = self.fn(*self.args, **self.kwargs)
File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/connection.py
", line 83, in _receive
status_code = self._protocol.data_received(data, self._results)
File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/protocol.py",
line 81, in data_received
'message': 'Server disconnected - please try to reconnect', 'attributes': {}})
gremlin_python.driver.protocol.GremlinServerError: 500: Server disconnected - please try to reconnect
Upvotes: 3
Views: 2413
Reputation: 31
I am not sure that this is the best way to solve this, but I'm also using gremlin-python and Neptune and I've had the same issue. I worked around it by implementing a Transport that you can provide to DriverRemoteConnection.
DriverRemoteConnection(
url=endpoint,
traversal_source=self._traversal_source,
transport_factory=Transport
)
gremlin-python returns connections to the pool on exception and the exception returned when a connection is closed is GremlinServerError which is also raised for other errors.
gremlin_python/driver/connection.py#L69 - gremlin_python/driver/protocol.py#L80
The custom transport is the same as gremlin-python's TornadoTransport but the read and write methods are extended to:
Dead connections that are added back to the pool are able to be reopended and then, you can then handle the StreamClosedError to apply some retry logic. I did it by overriding the submit and submitAsync methods in DriverRemoteConnection, but you could catch and retry anywhere.
class Transport(AbstractBaseTransport):
def __init__(self):
self._ws = None
self._loop = ioloop.IOLoop(make_current=False)
self._url = None
# Because the transport will try to reopen the underlying ws connection
# track if the closed() method has been called to prevent the transport
# from reopening.
self._explicit_closed = True
@property
def closed(self):
return not self._ws.protocol
def connect(self, url, headers=None):
self._explicit_closed = False
# Set the endpoint URL
self._url = httpclient.HTTPRequest(url, headers=headers) if headers else url
# Open the connection
self._connect()
def write(self, message):
# Before writing, try to ensure that the connection is open.
if self.closed:
self._connect()
self._loop.run_sync(lambda: self._ws.write_message(message, binary=True))
def read(self):
result = self._loop.run_sync(self._ws.read_message)
# If the read call returns None, the stream has closed.
if result is None:
self._ws.close() # Ensure we close the stream
raise StreamClosedError()
return result
def close(self):
self._ws.close()
self._loop.close()
self._explicit_closed = True
def _connect(self):
# If close() was called explicitly on the transport, don't allow
# subsequent calls to write() to reopen the connection.
if self._explicit_closed:
raise TransportClosedError(
"Transport has been closed and can not be reopened."
)
# Check if the ws is closed, if it is not, close it.
if self._ws and not self.closed:
self._ws.close()
# Open the ws connection
self._ws = self._loop.run_sync(
lambda: websocket.websocket_connect(url=self._url)
)
class TransportClosedError(Exception):
pass
This will work in with gremlin-pythons connection pooling as well.
If you don't need pooling, an alternate approach is to set the pool size to 1 and implement some form of keep-alive like is discussed here. TINKERPOP-2352
It looks like the web socket ping/keep-alive in gremlin-python is not implemented yet TINKERPOP-1886.
Upvotes: 3