Abhimanyu Talwar
Abhimanyu Talwar

Reputation: 83

Gremlin Python - "Server disconnected - please try to reconnect" error

I have a Flask web app in which I want to keep a persistent connection to an AWS Neptune graph database. This connection is established as follows:

from gremlin_python.process.anonymous_traversal import traversal
from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection

neptune_endpt = 'db-instance-x.xxxxxxxxxx.xx-xxxxx-x.neptune.amazonaws.com'
remoteConn = DriverRemoteConnection(f'wss://{neptune_endpt}:8182/gremlin','g')
self.g = traversal().withRemote(remoteConn)

The issue I'm facing is that the connection automatically drops off if left idle, and I cannot find a way to detect if the connection has dropped off (so that I can reconnect by using the code snippet above).

I have seen this similar issue: Gremlin server withRemote connection closed - how to reconnect automatically? however that question has no solution as well. This similar question has no answer either.

I've tried the following two solutions (both of which did not work):

  1. I setup my webapp behind four Gunicorn workers with a timeout of a 100 seconds, hoping that worker restarts would take care of Gremlin timeouts.
  2. I tried catching exceptions to detect if the connection has dropped off. Every time I use self.g to do some traversal on my graph, I try to "refresh" the connection, by which I mean this:
def _refresh_neptune(self):
    try:
        self.g = traversal().withRemote(self.conn)
    except:
        self.conn = DriverRemoteConnection(f'wss://{neptune_endpt}:8182/gremlin','g')
        self.g = traversal().withRemote(self.conn)

Here self.conn was initialized as:

self.conn = DriverRemoteConnection(f'wss://{neptune_endpt}:8182/gremlin','g')

Is there any way to get around this connection error?

Thanks

Update: Added the error message below:

  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/process/traversal.py
", line 58, in toList
    return list(iter(self))
  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/process/traversal.py
", line 48, in __next__
    self.traversal_strategies.apply_strategies(self)
  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/process/traversal.py
", line 573, in apply_strategies
    traversal_strategy.apply(traversal)
  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/remote_connec
tion.py", line 149, in apply
    remote_traversal = self.remote_connection.submit(traversal.bytecode)
  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/driver_remote
_connection.py", line 56, in submit
    results = result_set.all().result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/resultset.py"
, line 90, in cb
    f.result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/usr/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/usr/lib/python3.6/concurrent/futures/thread.py", line 56, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/connection.py
", line 83, in _receive
    status_code = self._protocol.data_received(data, self._results)
  File "/home/ubuntu/.virtualenvs/rundev/lib/python3.6/site-packages/gremlin_python/driver/protocol.py",
 line 81, in data_received
    'message': 'Server disconnected - please try to reconnect', 'attributes': {}})
gremlin_python.driver.protocol.GremlinServerError: 500: Server disconnected - please try to reconnect

Upvotes: 3

Views: 2413

Answers (1)

Nathan
Nathan

Reputation: 31

I am not sure that this is the best way to solve this, but I'm also using gremlin-python and Neptune and I've had the same issue. I worked around it by implementing a Transport that you can provide to DriverRemoteConnection.

DriverRemoteConnection(
    url=endpoint,
    traversal_source=self._traversal_source,
    transport_factory=Transport
)

gremlin-python returns connections to the pool on exception and the exception returned when a connection is closed is GremlinServerError which is also raised for other errors.

gremlin_python/driver/connection.py#L69 - gremlin_python/driver/protocol.py#L80

The custom transport is the same as gremlin-python's TornadoTransport but the read and write methods are extended to:

  • Reopen closed connections, if the web socket client is closed
  • Raise a StreamClosedError, if the web socket client returns None from read_message

Dead connections that are added back to the pool are able to be reopended and then, you can then handle the StreamClosedError to apply some retry logic. I did it by overriding the submit and submitAsync methods in DriverRemoteConnection, but you could catch and retry anywhere.

class Transport(AbstractBaseTransport):
    def __init__(self):
        self._ws = None
        self._loop = ioloop.IOLoop(make_current=False)
        self._url = None

        # Because the transport will try to reopen the underlying ws connection
        # track if the closed() method has been called to prevent the transport
        # from reopening.
        self._explicit_closed = True

    @property
    def closed(self):
        return not self._ws.protocol

    def connect(self, url, headers=None):
        self._explicit_closed = False

        # Set the endpoint URL
        self._url = httpclient.HTTPRequest(url, headers=headers) if headers else url

        # Open the connection
        self._connect()

    def write(self, message):
        # Before writing, try to ensure that the connection is open.
        if self.closed:
            self._connect()

        self._loop.run_sync(lambda: self._ws.write_message(message, binary=True))

    def read(self):
        result = self._loop.run_sync(self._ws.read_message)

        # If the read call returns None, the stream has closed.
        if result is None:
            self._ws.close()  # Ensure we close the stream
            raise StreamClosedError()

        return result

    def close(self):
        self._ws.close()
        self._loop.close()
        self._explicit_closed = True

    def _connect(self):
        # If close() was called explicitly on the transport, don't allow
        # subsequent calls to write() to reopen the connection.
        if self._explicit_closed:
            raise TransportClosedError(
                "Transport has been closed and can not be reopened."
            )

        # Check if the ws is closed, if it is not, close it.
        if self._ws and not self.closed:
            self._ws.close()

        # Open the ws connection
        self._ws = self._loop.run_sync(
            lambda: websocket.websocket_connect(url=self._url)
        )


class TransportClosedError(Exception):
    pass

This will work in with gremlin-pythons connection pooling as well.

If you don't need pooling, an alternate approach is to set the pool size to 1 and implement some form of keep-alive like is discussed here. TINKERPOP-2352

It looks like the web socket ping/keep-alive in gremlin-python is not implemented yet TINKERPOP-1886.

Upvotes: 3

Related Questions