Reputation: 3099
I am creating and passing a connection to the database while streaming the data. Reading everytime the data from the file and creating Neo4j sessions adds performance overhead. How can I change the existing code to improve the performance of the application? Should I change foreachRDD to foreachPartition in order to create a separate object for the connection?
Here is my code for streaming:
val wordsArrays: DStream[Array[String]] = values.map(t => t.split(", "))
wordsArrays.foreachRDD(rdd => {
rdd.flatMap(
data => {
val recommendations = execNeo4jSearchQuery(neo4jConfigs.getNeo4jConfig(args(1)), data)
val calendarTime = Calendar.getInstance.getTime
val recommendationsMap = convertDataToMap(recommendations, calendarTime)
recommendationsMap
}).saveToEs("rdd-timed/output")
}
)
Upvotes: 1
Views: 321
Reputation: 3099
Usage of database connection with mapPartitions is preferable, rdd with updated partitions is then saved to ElasticSearch:
wordsArrays.foreachRDD(rdd => {
rdd.mapPartitions { partition => {
val neo4jConfig = neo4jConfigurations.getNeo4jConfig(args(1))
val result = partition.map( data => {
val recommendations = execNeo4jSearchQuery(neo4jConfig, data)
val calendarTime = Calendar.getInstance.getTime
convertDataToMap(recommendations, calendarTime)
}).toList.flatten
result.iterator
}
}.saveToEs("rdd-timed/output")
})
Upvotes: 0
Reputation: 336
foreachPartiotion enables you to create an object per Partition and not per map iteration, Its useful when you need to create a single connection for each partition.
But in your case it seems all the object you create depend either on the input value of the map or on the current time. so I dont see how it would help you.
unless you create a connection in each run of execNeo4jSearchQuery I dont see how it helps you, but if you do create a connection each call for the function which isn't dependent on data then it will help improve the code. ( but good chance the bottleneck isnt there so you wont see great improvement).
Upvotes: 2