Cassie
Cassie

Reputation: 3099

DB connection with foreachRDD Spark Streaming

I am creating and passing a connection to the database while streaming the data. Reading everytime the data from the file and creating Neo4j sessions adds performance overhead. How can I change the existing code to improve the performance of the application? Should I change foreachRDD to foreachPartition in order to create a separate object for the connection?

Here is my code for streaming:

val wordsArrays: DStream[Array[String]] = values.map(t => t.split(", "))

wordsArrays.foreachRDD(rdd => {

  rdd.flatMap(
  data => {
    val recommendations = execNeo4jSearchQuery(neo4jConfigs.getNeo4jConfig(args(1)), data)
    val calendarTime = Calendar.getInstance.getTime
    val recommendationsMap = convertDataToMap(recommendations, calendarTime)
    recommendationsMap

  }).saveToEs("rdd-timed/output")
 }
)

Upvotes: 1

Views: 321

Answers (2)

Cassie
Cassie

Reputation: 3099

Usage of database connection with mapPartitions is preferable, rdd with updated partitions is then saved to ElasticSearch:

 wordsArrays.foreachRDD(rdd => {

      rdd.mapPartitions { partition => {
            val neo4jConfig = neo4jConfigurations.getNeo4jConfig(args(1))

            val result = partition.map( data => {

              val recommendations = execNeo4jSearchQuery(neo4jConfig, data)
              val calendarTime = Calendar.getInstance.getTime
              convertDataToMap(recommendations, calendarTime)

          }).toList.flatten
          result.iterator
        }
      }.saveToEs("rdd-timed/output")
    })

Upvotes: 0

Ilya Brodezki
Ilya Brodezki

Reputation: 336

foreachPartiotion enables you to create an object per Partition and not per map iteration, Its useful when you need to create a single connection for each partition.

But in your case it seems all the object you create depend either on the input value of the map or on the current time. so I dont see how it would help you.

unless you create a connection in each run of execNeo4jSearchQuery I dont see how it helps you, but if you do create a connection each call for the function which isn't dependent on data then it will help improve the code. ( but good chance the bottleneck isnt there so you wont see great improvement).

Upvotes: 2

Related Questions