Debajyoti Roy
Debajyoti Roy

Reputation: 995

Concurrent query and delete, leads to Memory Leak in Neo4j 2.0.3 community

This program simulates a server that concurrently queries nodes based on user input and deletes them. Each user request (of query and then delete) is being processed on a separate thread.

There are no compile or runtime issues however, a Memory Leak of DiffSets leads to an eventual crash. Please advice fixes/workarounds.

Heap monitored during program execution:

enter image description here

enter image description here

Source code:

package net.ahm.graph;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.neo4j.cypher.javacompat.ExecutionEngine;
import org.neo4j.cypher.javacompat.ExecutionResult;
import org.neo4j.graphdb.Direction;
import org.neo4j.graphdb.DynamicLabel;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.ResourceIterable;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.graphdb.schema.IndexDefinition;
import org.neo4j.graphdb.schema.Schema;
import org.neo4j.kernel.impl.util.FileUtils;
import org.neo4j.kernel.impl.util.StringLogger;

public class DeleteLab {
    private static final int CHILDREN = 10000;
    private static final Logger LOG = Logger.getLogger(DeleteLab.class);

    public static void main(String[] args) throws Exception {
        FileUtils.deleteRecursively(new File("graphdb"));
        final GraphDatabaseService graphDb = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder("graphdb")
                .setConfig(GraphDatabaseSettings.use_memory_mapped_buffers, "true").setConfig(GraphDatabaseSettings.cache_type, "strong")
                .newGraphDatabase();
        registerShutdownHook(graphDb);
        LOG.info(">>>> STARTED GRAPHDB");
        createIndex("Parent", "name", graphDb);
        createIndex("Child", "name", graphDb);
        final Node parent;

        try (Transaction tx = graphDb.beginTx()) {
            parent = graphDb.createNode(DynamicLabel.label("Parent"));
            parent.setProperty("name", "parent");
            tx.success();
        }

        try (Transaction tx = graphDb.beginTx()) {
            for (int i = 0; i < CHILDREN; i++) {
                Node child = graphDb.createNode(DynamicLabel.label("Child"));
                child.setProperty("name", "child" + i);
                child.setProperty("count", i);
                parent.createRelationshipTo(child, RelationshipTypes.PARENT_CHILD);
            }
            tx.success();
        }
        LOG.info(">>>> CREATED NODES");
        final ExecutionEngine engine = new ExecutionEngine(graphDb, StringLogger.SYSTEM);
        ExecutorService es = Executors.newFixedThreadPool(50);
        final CountDownLatch cdl = new CountDownLatch(CHILDREN);

        for (int i = 0; i < CHILDREN; i++) {
            final int count = i;
            es.execute(new Runnable() {
                @Override
                public void run() {
                    try (Transaction tx = graphDb.beginTx()) {
                        tx.acquireWriteLock(parent);
                        Map<String, Object> params = new HashMap<String, Object>();
                        params.put("cCount", count);
                        ExecutionResult result = engine.execute(
                                "match (n:Parent)-[:PARENT_CHILD]->(m:Child) where m.count={cCount} return m.name", params);
                        for (Map<String, Object> row : result) {
                            String cName = (String) row.get("m.name");
                            Node child = findNode("Child", "name", cName, graphDb);
                            Relationship r = child.getSingleRelationship(RelationshipTypes.PARENT_CHILD, Direction.INCOMING);
                            r.delete();
                            child.delete();
                        }
                        tx.success();
                    } finally {
                        cdl.countDown();
                    }
                }
            });
        }
        cdl.await();
        LOG.info(">>>> DELETED NODES");
        es.shutdown();
    }

    private static void createIndex(String label, String propertyName, GraphDatabaseService graphDb) {
        IndexDefinition indexDefinition;
        try (Transaction tx = graphDb.beginTx()) {
            Schema schema = graphDb.schema();
            indexDefinition = schema.indexFor(DynamicLabel.label(label)).on(propertyName).create();
            tx.success();
        }
        try (Transaction tx = graphDb.beginTx()) {
            Schema schema = graphDb.schema();
            schema.awaitIndexOnline(indexDefinition, 10, TimeUnit.SECONDS);
            tx.success();
        }
    }

    private static void registerShutdownHook(final GraphDatabaseService graphDb) {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                LOG.info("### GRAPHDB SHUTDOWNHOOK INVOKED !!!");
                graphDb.shutdown();
            }
        });
    }

    private enum RelationshipTypes implements RelationshipType {
        PARENT_CHILD
    }

    public static Node findNode(String labelName, String propertyName, Object propertyValue, GraphDatabaseService graphDb) {
        if (propertyValue != null) {
            Label label = DynamicLabel.label(labelName);
            ResourceIterable<Node> ri = graphDb.findNodesByLabelAndProperty(label, propertyName, propertyValue);
            if (ri != null) {
                try {
                    ResourceIterator<Node> iter = ri.iterator();
                    try {
                        if (iter != null && iter.hasNext()) {
                            return iter.next();
                        }
                    } finally {
                        iter.close();
                    }
                } catch (Exception e) {
                    LOG.error("ERROR WHILE FINDING ID: " + propertyValue + " , LABEL: " + labelName + " , PROPERTY: " + propertyName, e);
                }
            }
        }
        return null;
    }
}

One of the plausible solutions for the memory leak is to not do a writelock in the same transaction where cypher is executed. Just catching NotFoundException and retrying the query seems to work fine.

Source code with a plausible fix:

package net.ahm.graph;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.neo4j.cypher.javacompat.ExecutionEngine;
import org.neo4j.cypher.javacompat.ExecutionResult;
import org.neo4j.graphdb.Direction;
import org.neo4j.graphdb.DynamicLabel;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.ResourceIterable;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.graphdb.schema.IndexDefinition;
import org.neo4j.graphdb.schema.Schema;
import org.neo4j.kernel.impl.util.FileUtils;
import org.neo4j.kernel.impl.util.StringLogger;

public class DeleteLab {
    private static final int CHILDREN = 10000;
    private static final Logger LOG = Logger.getLogger(DeleteLab.class);

    public static void main(String[] args) throws Exception {
        FileUtils.deleteRecursively(new File("graphdb"));
        final GraphDatabaseService graphDb = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder("graphdb")
                .setConfig(GraphDatabaseSettings.use_memory_mapped_buffers, "true").setConfig(GraphDatabaseSettings.cache_type, "strong")
                .newGraphDatabase();
        registerShutdownHook(graphDb);
        LOG.info(">>>> STARTED GRAPHDB");
        createIndex("Parent", "name", graphDb);
        createIndex("Child", "name", graphDb);
        final Node parent;

        try (Transaction tx = graphDb.beginTx()) {
            parent = graphDb.createNode(DynamicLabel.label("Parent"));
            parent.setProperty("name", "parent");
            tx.success();
        }

        try (Transaction tx = graphDb.beginTx()) {
            for (int i = 0; i < CHILDREN; i++) {
                Node child = graphDb.createNode(DynamicLabel.label("Child"));
                child.setProperty("name", "child" + i);
                child.setProperty("count", i);
                parent.createRelationshipTo(child, RelationshipTypes.PARENT_CHILD);
            }
            tx.success();
        }
        LOG.info(">>>> CREATED NODES");
        final ExecutionEngine engine = new ExecutionEngine(graphDb, StringLogger.SYSTEM);
        ExecutorService es = Executors.newFixedThreadPool(50);
        final CountDownLatch cdl = new CountDownLatch(CHILDREN);

        for (int i = 0; i < CHILDREN; i++) {
            final int count = i;
            es.execute(new Runnable() {
                @Override
                public void run() {
                    String cName = null;
                    boolean success = false;
                    try (Transaction tx = graphDb.beginTx()) {
                        while (!success) {
                            try {
                                Map<String, Object> params = new HashMap<String, Object>();
                                params.put("cCount", count);
                                ExecutionResult result = engine.execute(
                                        "match (n:Parent)-[:PARENT_CHILD]->(m:Child) where m.count={cCount} return m.name", params);
                                for (Map<String, Object> row : result) {
                                    cName = (String) row.get("m.name");
                                    break;
                                }
                                success = true;
                            } catch (org.neo4j.graphdb.NotFoundException e) {
                                LOG.info(">>>> RETRY QUERY ON NotFoundException: " + count);
                                try {
                                    Thread.sleep((long) Math.random() * 100);
                                } catch (InterruptedException e1) {
                                    e1.printStackTrace();
                                }
                            }
                        }
                    }

                    try (Transaction tx = graphDb.beginTx()) {
                        if (cName != null) {
                            tx.acquireWriteLock(parent);
                            Node child = findNode("Child", "name", cName, graphDb);
                            Relationship r = child.getSingleRelationship(RelationshipTypes.PARENT_CHILD, Direction.INCOMING);
                            r.delete();
                            child.delete();
                            LOG.info(">>>> DELETING NODES: " + cName);
                        }
                        tx.success();
                    } finally {
                        cdl.countDown();
                    }
                }
            });
        }
        cdl.await();
        LOG.info(">>>> DELETED NODES");
        es.shutdown();
    }

    private static void createIndex(String label, String propertyName, GraphDatabaseService graphDb) {
        IndexDefinition indexDefinition;
        try (Transaction tx = graphDb.beginTx()) {
            Schema schema = graphDb.schema();
            indexDefinition = schema.indexFor(DynamicLabel.label(label)).on(propertyName).create();
            tx.success();
        }
        try (Transaction tx = graphDb.beginTx()) {
            Schema schema = graphDb.schema();
            schema.awaitIndexOnline(indexDefinition, 10, TimeUnit.SECONDS);
            tx.success();
        }
    }

    private static void registerShutdownHook(final GraphDatabaseService graphDb) {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                LOG.info("### GRAPHDB SHUTDOWNHOOK INVOKED !!!");
                graphDb.shutdown();
            }
        });
    }

    private enum RelationshipTypes implements RelationshipType {
        PARENT_CHILD
    }

    public static Node findNode(String labelName, String propertyName, Object propertyValue, GraphDatabaseService graphDb) {
        if (propertyValue != null) {
            Label label = DynamicLabel.label(labelName);
            ResourceIterable<Node> ri = graphDb.findNodesByLabelAndProperty(label, propertyName, propertyValue);
            if (ri != null) {
                try {
                    ResourceIterator<Node> iter = ri.iterator();
                    try {
                        if (iter != null && iter.hasNext()) {
                            return iter.next();
                        }
                    } finally {
                        iter.close();
                    }
                } catch (Exception e) {
                    LOG.error("ERROR WHILE FINDING ID: " + propertyValue + " , LABEL: " + labelName + " , PROPERTY: " + propertyName, e);
                }
            }
        }
        return null;
    }
}

Upvotes: 4

Views: 892

Answers (2)

Jacob Davis-Hansson
Jacob Davis-Hansson

Reputation: 2663

This has been resolved, and I've ran your example code above on the current latest release to double-verify, this is on Neo4j 2.3.1:

YourKit Heap

Upvotes: 0

Debajyoti Roy
Debajyoti Roy

Reputation: 995

Only solution i am aware of so far: is to not do a write-lock in the same transaction where cypher is executed. Just catching NotFoundException and retrying the query seems to work fine. Memory leak doesn't occur in this case.

Following was the fixed code:

package net.ahm.graph;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.neo4j.cypher.javacompat.ExecutionEngine;
import org.neo4j.cypher.javacompat.ExecutionResult;
import org.neo4j.graphdb.Direction;
import org.neo4j.graphdb.DynamicLabel;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.ResourceIterable;
import org.neo4j.graphdb.ResourceIterator;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseFactory;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.graphdb.schema.IndexDefinition;
import org.neo4j.graphdb.schema.Schema;
import org.neo4j.kernel.impl.util.FileUtils;
import org.neo4j.kernel.impl.util.StringLogger;

public class DeleteLab {
    private static final int CHILDREN = 10000;
    private static final Logger LOG = Logger.getLogger(DeleteLab.class);

    public static void main(String[] args) throws Exception {
        FileUtils.deleteRecursively(new File("graphdb"));
        final GraphDatabaseService graphDb = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder("graphdb")
                .setConfig(GraphDatabaseSettings.use_memory_mapped_buffers, "true").setConfig(GraphDatabaseSettings.cache_type, "strong")
                .newGraphDatabase();
        registerShutdownHook(graphDb);
        LOG.info(">>>> STARTED GRAPHDB");
        createIndex("Parent", "name", graphDb);
        createIndex("Child", "name", graphDb);
        final Node parent;

        try (Transaction tx = graphDb.beginTx()) {
            parent = graphDb.createNode(DynamicLabel.label("Parent"));
            parent.setProperty("name", "parent");
            tx.success();
        }

        try (Transaction tx = graphDb.beginTx()) {
            for (int i = 0; i < CHILDREN; i++) {
                Node child = graphDb.createNode(DynamicLabel.label("Child"));
                child.setProperty("name", "child" + i);
                child.setProperty("count", i);
                parent.createRelationshipTo(child, RelationshipTypes.PARENT_CHILD);
            }
            tx.success();
        }
        LOG.info(">>>> CREATED NODES");
        final ExecutionEngine engine = new ExecutionEngine(graphDb, StringLogger.SYSTEM);
        ExecutorService es = Executors.newFixedThreadPool(50);
        final CountDownLatch cdl = new CountDownLatch(CHILDREN);

        for (int i = 0; i < CHILDREN; i++) {
            final int count = i;
            es.execute(new Runnable() {
                @Override
                public void run() {
                    String cName = null;
                    boolean success = false;
                    try (Transaction tx = graphDb.beginTx()) {
                        while (!success) {
                            try {
                                Map<String, Object> params = new HashMap<String, Object>();
                                params.put("cCount", count);
                                ExecutionResult result = engine.execute(
                                        "match (n:Parent)-[:PARENT_CHILD]->(m:Child) where m.count={cCount} return m.name", params);
                                for (Map<String, Object> row : result) {
                                    cName = (String) row.get("m.name");
                                    break;
                                }
                                success = true;
                            } catch (org.neo4j.graphdb.NotFoundException e) {
                                LOG.info(">>>> RETRY QUERY ON NotFoundException: " + count);
                                try {
                                    Thread.sleep((long) Math.random() * 100);
                                } catch (InterruptedException e1) {
                                    e1.printStackTrace();
                                }
                            }
                        }
                    }

                    try (Transaction tx = graphDb.beginTx()) {
                        if (cName != null) {
                            tx.acquireWriteLock(parent);
                            Node child = findNode("Child", "name", cName, graphDb);
                            Relationship r = child.getSingleRelationship(RelationshipTypes.PARENT_CHILD, Direction.INCOMING);
                            r.delete();
                            child.delete();
                            LOG.info(">>>> DELETING NODES: " + cName);
                        }
                        tx.success();
                    } finally {
                        cdl.countDown();
                    }
                }
            });
        }
        cdl.await();
        LOG.info(">>>> DELETED NODES");
        es.shutdown();
    }

    private static void createIndex(String label, String propertyName, GraphDatabaseService graphDb) {
        IndexDefinition indexDefinition;
        try (Transaction tx = graphDb.beginTx()) {
            Schema schema = graphDb.schema();
            indexDefinition = schema.indexFor(DynamicLabel.label(label)).on(propertyName).create();
            tx.success();
        }
        try (Transaction tx = graphDb.beginTx()) {
            Schema schema = graphDb.schema();
            schema.awaitIndexOnline(indexDefinition, 10, TimeUnit.SECONDS);
            tx.success();
        }
    }

    private static void registerShutdownHook(final GraphDatabaseService graphDb) {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                LOG.info("### GRAPHDB SHUTDOWNHOOK INVOKED !!!");
                graphDb.shutdown();
            }
        });
    }

    private enum RelationshipTypes implements RelationshipType {
        PARENT_CHILD
    }

    public static Node findNode(String labelName, String propertyName, Object propertyValue, GraphDatabaseService graphDb) {
        if (propertyValue != null) {
            Label label = DynamicLabel.label(labelName);
            ResourceIterable<Node> ri = graphDb.findNodesByLabelAndProperty(label, propertyName, propertyValue);
            if (ri != null) {
                try {
                    ResourceIterator<Node> iter = ri.iterator();
                    try {
                        if (iter != null && iter.hasNext()) {
                            return iter.next();
                        }
                    } finally {
                        iter.close();
                    }
                } catch (Exception e) {
                    LOG.error("ERROR WHILE FINDING ID: " + propertyValue + " , LABEL: " + labelName + " , PROPERTY: " + propertyName, e);
                }
            }
        }
        return null;
    }
}

Upvotes: 2

Related Questions