Nik
Nik

Reputation: 431

Spark UDF optimization for Graph Database (Neo4j) inserts

This is first issue i am posting so apologies if i miss some info and mediocre formatting. I can update if required.

I will try to add as many details as possible. I have a not so optimized Spark Job which converts RDBMS data to graph nodes and relations in Neo4j.

To do this. Here is the steps i follow:

  1. create a denormalized dataframe 'data' with spark sql and joins.
  2. Foreach row in 'data' run a graphInsert function which does the following:

    a. read contents of the row
    b. formulate a neo4j cypher query (We use Merge command so that we have have only one City e.g. Chicago created in Neo4j when Chicago will be present in multiple lines in RDBMS table)
    c. connect to neo4j
    d. execute the query
    e. disconnect from neo4j

Here is the list of problems i am facing.

  1. Inserts are slow.

I know Merge query is slower than create but is there another way to do this instead of connecting and disconnecting for every record? This was my first draft code and maybe i am struggling how i will use one connection to insert from multiple threads on different spark worker nodes. Hence connecting and disconnecting for every record.

  1. The job is not scalable. It only runs fine with 1 core. As soon as i run the job with 2 spark cores i suddenly get 2 cities with same name, even when i am running merge queries. e.g. There are 2 Chicago cities which violates the use of Merge. I am assuming that Merge functions something like "Create if not exist".

I dont know if my implementation is wrong in neo4j part or spark. If anyone can direct me to any documentation which helps me implement this on a better scale it will be helpful as i have a big spark cluster which i need to utilize at full potential for this job.

If you are interested to look at code instead of algorithm. Here is graphInsert implementation in scala:

class GraphInsert extends Serializable{
   var case_attributes = new Array[String](4)
   var city_attributes = new Array[String](2)
   var location_attributes = new Array[String](20)
   var incident_attributes = new Array[String](20)
   val prop = new Properties()
   prop.load(getClass().getResourceAsStream("/GraphInsertConnection.properties"))
   // properties Neo4j
   val url_neo4j = prop.getProperty("url_neo4j")
   val neo4j_user = prop.getProperty("neo4j_user")
   val neo4j_password = prop.getProperty("neo4j_password")


   def graphInsert(data : Row){  
      val query = "MERGE (d:CITY {name:city_attributes(0)})\n" +"MERGE (a:CASE { " + case_attributes(0)  + ":'" +data(11) + "'," +case_attributes(1)  + ":'" +data(13)  + "'," +case_attributes(2)  + ":'" +data(14) +"'}) \n" +"MERGE (b:INCIDENT { " + incident_attributes(0)  + ":" +data(0) + "," +incident_attributes(1)  + ":" +data(2)  + "," +incident_attributes(2)  + ":'" +data(3) +  "'," +incident_attributes(3)  + ":'" +data(8)+  "'," +incident_attributes(4)  + ":" +data(5) +  "," +incident_attributes(5)  + ":'" +data(4) +  "'," +incident_attributes(6)  + ":'" +data(6) +  "'," +incident_attributes(7)  + ":'" +data(1) +  "'," +incident_attributes(8)  + ":" +data(7)+"}) \n" +"MERGE (c:LOCATION { " + location_attributes(0)  + ":" +data(9) + "," +location_attributes(1)  + ":" +data(10)  + "," +location_attributes(2)  + ":'" +data(19) +  "'," +location_attributes(3)  + ":'" +data(20)+  "'," +location_attributes(4)  + ":" +data(18) +  "," +location_attributes(5)  + ":" +data(21) +  "," +location_attributes(6)  + ":'" +data(17) +  "'," +location_attributes(7)  + ":" +data(22) +  "," +location_attributes(8)  + ":" +data(23)+"}) \n" +"MERGE (a) - [r1:"+relation_case_incident+"]->(b)-[r2:"+relation_incident_location+"]->(c)-[r3:belongs_to]->(d);"
              println(query)
              try{
                      var con = DriverManager.getConnection(url_neo4j, neo4j_user, neo4j_password)
                          var stmt = con.createStatement()
                          var rs = stmt.executeQuery(query)
                          con.close()
              }catch{
              case ex: SQLException =>{
                  println(ex.getMessage)
              }
              }
  } 

def operations(sqlContext: SQLContext){
    ....
    #Get 'data' before this step
    city_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_city").map(x =>x.getString(5)).collect()
    case_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_case_number").map(x =>x.getString(5)).collect()
    location_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_location").map(x =>x.getString(5)).collect()
    incident_attributes= entity_metadata.filter(entity_metadata("source_name") === "tb_incident").map(x =>x.getString(5)).collect()

    data.foreach(graphInsert)

}

object GraphObject {
  def main(args: Array[String]) {  
      val conf = new SparkConf()
        .setAppName("GraphNeo4j")
        .setMaster("xyz")
        .set("spark.cores.max","2")
        .set("spark.executor.memory","10g")

      Logger.getLogger("org").setLevel(Level.ERROR)
      Logger.getLogger("akka").setLevel(Level.ERROR)
      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)
      val graph = new GraphInsert()
      graph.operations(sqlContext)

  }
}

Upvotes: 0

Views: 461

Answers (2)

Nik
Nik

Reputation: 431

I am done improving the process but nothing could make it as fast as LOAD command in Cypher. Hope this helps someone though: use foreachPartition instead of foreach gives significant gain while doing such process. Also adding periodic commit using cypher.

Upvotes: 0

Shivansh
Shivansh

Reputation: 3544

Whatever you write inside the closure i.e it needs to be executed on Worker gets distributed. You can read more about it here : http://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka

And as you increase the number of cores, I think it must not effect the application because if you do not specify it ! then it takes the greedy approach ! I hope this document helps .

Upvotes: 0

Related Questions