dcsan
dcsan

Reputation: 12305

simple example of working with neo4j python driver?

Is there a simple example of working with the neo4j python driver? How do I just pass cypher query to the driver to run and return a cursor?

If I'm reading for example this it seems the demo has a class wrapper, with a private member func I pass to the session.write,

session.write_transaction(self._create_and_return_greeting, ...

That then gets called it with a transaction as a first parameter...

def _create_and_return_greeting(tx, message):

that in turn runs the cypher

result = tx.run("CREATE (a:Greeting) "

This seems 10X more complicated than it needs to be.

I did just try a simpler:

def raw_query(query, **kwargs):
    neodriver = neo_connect()  # cached dbconn
    with neodriver.session() as session:
        try:
            result = session.run(query, **kwargs)
            return result.data()

But this results in a socket error on the query, probably because the session goes out of scope?

[dfcx/__init__] ERROR | Underlying socket connection gone (_ssl.c:2396)

[dfcx/__init__] ERROR | Failed to write data to connection IPv4Address(('neo4j-core-8afc8558-3.production-orch-0042.neo4j.io', 7687)) (IPv4Address(('34.82.120.138', 7687)))

Also I can't return a cursor/iterator, just the data() When the session goes out of scope, the query result seems to die with it.

If I manually open and close a session, then I'd have the same problems?

Python must be the most popular language this DB is used with, does everyone use a different driver? Py2neo seems cute, but completely lacking in ORM wrapper function for most of the cypher language features, so you have to drop down to raw cypher anyway. And I'm not sure it supports **kwargs argument interpolation in the same way.

I guess that big raise should help iron out some kinks :D

Slightly longer version trying to get a working DB wrapper:

def neo_connect() -> Union[neo4j.BoltDriver, neo4j.Neo4jDriver]:

    global raw_driver
    if raw_driver:
        # print('reuse driver')
        return raw_driver

    neoconfig = NEOCONFIG
    raw_driver = neo4j.GraphDatabase.driver(
        neoconfig['url'], auth=(
            neoconfig['user'], neoconfig['pass']))
    if raw_driver is None:
        raise BaseException("cannot connect to neo4j")
    else:
        return raw_driver


def raw_query(query, **kwargs):
    # just get data, no cursor
    neodriver = neo_connect()
    session = neodriver.session()
    # logging.info('neoquery %s', query)
    # with neodriver.session() as session:
    try:
        result = session.run(query, **kwargs)
        data = result.data()
        return data

    except neo4j.exceptions.CypherSyntaxError as err:
        logging.error('neo error %s', err)
        logging.error('failed query: %s', query)
        raise err
    # finally:
    #     logging.info('close session')
    #     session.close()

update: someone pointed me to this example which is another way to use the tx wrapper.

https://github.com/neo4j-graph-examples/northwind/blob/main/code/python/example.py#L16-L21

Upvotes: 3

Views: 3078

Answers (1)

Robsdedude
Robsdedude

Reputation: 1403

def raw_query(query, **kwargs):
    neodriver = neo_connect()  # cached dbconn
    with neodriver.session() as session:
        try:
            result = session.run(query, **kwargs)
            return result.data()

This is perfectly fine and works as intended on my end.

The error you're seeing is stating that there is a connection problem. So there must be something going on between the server and the driver that's outside of its influence.

Also, please note, that there is a difference between all of these ways to run a query:

  • with driver.session():
        result = session.run("<SOME CYPHER>")
    
  • def work(tx):
        result = tx.run("<SOME CYPHER>") 
    
    with driver.session():
        session.write_transaction(work)
    

The latter one might be 3 lines longer and the team working on the drivers collected some feedback regarding this. However, there are more things to consider here. Firstly, changing the API surface is something that needs careful planning and cannot be done in say a patch release. Secondly, there are technical hurdles to overcome. Here are the semantics, anyway:

  • Auto-commit transaction. Runs only that query as one unit of work. If you run a new auto-commit transaction within the same session, the previous result will buffer all available records for you (depending on the query, this will consume a lot of memory). This can be avoided by calling result.consume(). However, if the session goes out of scope, the result will be consumed automatically. This means you cannot extract further records from it. Lastly, any error will be raised and needs handling in the application code.
  • Managed transaction. Runs whatever unit of work you want within that function. A transaction is implicitly started and committed (unless you rollback explicitly) around the function. If the transaction ends (end of function or rollback), the result will be consumed and become invalid. You'll have to extract all records you need before that.
    This is the recommended way of using the driver because it will not raise all errors but handle some internally (where appropriate) and retry the work function (e.g. if the server is only temporarily unavailable). Since the function might be executed multiple time, you must make sure it's idempotent.

Closing thoughts:
Please remember that stackoverlfow is monitored on a best-effort basis and what can be perceived as hasty comments may get in the way of getting helpful answers to your questionsZ

Update 2025

For single-query transactions, you might want to have a look at driver.execute_query, which is functionally equivalent to transaction functions (the second code snippet I gave, which uses def work(tx): ...), but is more concise and potentially allows the driver to perform optimizations with the extra knowledge about the structure of the transaction.

driver.execute_query(
    "<SOME CYPHER>",
    # specify wehter to route the query to a reader or writer in the cluster
    routing_=neo4j.RoutingControl.READ,  # or just "r"
    # specify the DB name, if known for better performance
    database_="neo4j",
)

Upvotes: 2

Related Questions