Reputation: 431
This is first issue i am posting so apologies if i miss some info and mediocre formatting. I can update if required.
I will try to add as many details as possible. I have a not so optimized Spark Job which converts RDBMS data to graph nodes and relations in Neo4j.
To do this. Here is the steps i follow:
Foreach row in 'data' run a graphInsert function which does the following:
a. read contents of the row
b. formulate a neo4j cypher query (We use Merge command so that we have have only one City e.g. Chicago created in Neo4j when Chicago will be present in multiple lines in RDBMS table)
c. connect to neo4j
d. execute the query
e. disconnect from neo4j
Here is the list of problems i am facing.
I know Merge query is slower than create but is there another way to do this instead of connecting and disconnecting for every record? This was my first draft code and maybe i am struggling how i will use one connection to insert from multiple threads on different spark worker nodes. Hence connecting and disconnecting for every record.
I dont know if my implementation is wrong in neo4j part or spark. If anyone can direct me to any documentation which helps me implement this on a better scale it will be helpful as i have a big spark cluster which i need to utilize at full potential for this job.
If you are interested to look at code instead of algorithm. Here is graphInsert implementation in scala:
class GraphInsert extends Serializable{
var case_attributes = new Array[String](4)
var city_attributes = new Array[String](2)
var location_attributes = new Array[String](20)
var incident_attributes = new Array[String](20)
val prop = new Properties()
prop.load(getClass().getResourceAsStream("/GraphInsertConnection.properties"))
// properties Neo4j
val url_neo4j = prop.getProperty("url_neo4j")
val neo4j_user = prop.getProperty("neo4j_user")
val neo4j_password = prop.getProperty("neo4j_password")
def graphInsert(data : Row){
val query = "MERGE (d:CITY {name:city_attributes(0)})\n" +"MERGE (a:CASE { " + case_attributes(0) + ":'" +data(11) + "'," +case_attributes(1) + ":'" +data(13) + "'," +case_attributes(2) + ":'" +data(14) +"'}) \n" +"MERGE (b:INCIDENT { " + incident_attributes(0) + ":" +data(0) + "," +incident_attributes(1) + ":" +data(2) + "," +incident_attributes(2) + ":'" +data(3) + "'," +incident_attributes(3) + ":'" +data(8)+ "'," +incident_attributes(4) + ":" +data(5) + "," +incident_attributes(5) + ":'" +data(4) + "'," +incident_attributes(6) + ":'" +data(6) + "'," +incident_attributes(7) + ":'" +data(1) + "'," +incident_attributes(8) + ":" +data(7)+"}) \n" +"MERGE (c:LOCATION { " + location_attributes(0) + ":" +data(9) + "," +location_attributes(1) + ":" +data(10) + "," +location_attributes(2) + ":'" +data(19) + "'," +location_attributes(3) + ":'" +data(20)+ "'," +location_attributes(4) + ":" +data(18) + "," +location_attributes(5) + ":" +data(21) + "," +location_attributes(6) + ":'" +data(17) + "'," +location_attributes(7) + ":" +data(22) + "," +location_attributes(8) + ":" +data(23)+"}) \n" +"MERGE (a) - [r1:"+relation_case_incident+"]->(b)-[r2:"+relation_incident_location+"]->(c)-[r3:belongs_to]->(d);"
println(query)
try{
var con = DriverManager.getConnection(url_neo4j, neo4j_user, neo4j_password)
var stmt = con.createStatement()
var rs = stmt.executeQuery(query)
con.close()
}catch{
case ex: SQLException =>{
println(ex.getMessage)
}
}
}
def operations(sqlContext: SQLContext){
....
#Get 'data' before this step
city_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_city").map(x =>x.getString(5)).collect()
case_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_case_number").map(x =>x.getString(5)).collect()
location_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_location").map(x =>x.getString(5)).collect()
incident_attributes= entity_metadata.filter(entity_metadata("source_name") === "tb_incident").map(x =>x.getString(5)).collect()
data.foreach(graphInsert)
}
object GraphObject {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("GraphNeo4j")
.setMaster("xyz")
.set("spark.cores.max","2")
.set("spark.executor.memory","10g")
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val graph = new GraphInsert()
graph.operations(sqlContext)
}
}
Upvotes: 0
Views: 461
Reputation: 431
I am done improving the process but nothing could make it as fast as LOAD command in Cypher.
Hope this helps someone though:
use foreachPartition
instead of foreach
gives significant gain while doing such process. Also adding periodic commit using cypher.
Upvotes: 0
Reputation: 3544
Whatever you write inside the closure i.e it needs to be executed on Worker gets distributed. You can read more about it here : http://spark.apache.org/docs/latest/programming-guide.html#understanding-closures-a-nameclosureslinka
And as you increase the number of cores, I think it must not effect the application because if you do not specify it ! then it takes the greedy approach ! I hope this document helps .
Upvotes: 0