Matt
Matt

Reputation: 123

Complete a RDD based on RDDs depending on data

I'm using spark 2.1 on yarn cluster. I have a RDD that contains data I would like to complete based on other RDDs (which correspond to different mongo databases that I get through https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage, but I don't think that is important, just mention it in case)

My problem is that the RDD I have to use to complete data depends on data itself because data contain the database to use. Here is a simplified exemple of what I have to do :

/*
 * The RDD which needs information from databases
 */
val RDDtoDevelop = sc.parallelize(Array(
    Map("dbName" -> "A", "id" -> "id1", "other data" -> "some data"),
    Map("dbName" -> "C", "id" -> "id6", "other data" -> "some other data"),
    Map("dbName" -> "A", "id" -> "id8", "other data" -> "some other other data")))
    .cache()

/*
 * Artificial databases for the exemple. Actually, mongo-hadoop is used. https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage 
 * This means that generate these RDDs COSTS so we don't want to generate all possible RDDs but only needed ones
 */ 
val A = sc.parallelize(Array(
    Map("id" -> "id1", "data" -> "data1"),
    Map("id" -> "id8", "data" -> "data8")
    ))

val B = sc.parallelize(Array(
    Map("id" -> "id1", "data" -> "data1bis"),
    Map("id" -> "id5", "data" -> "data5")
    ))

val C = sc.parallelize(Array(
    Map("id" -> "id2", "data" -> "data2"),
    Map("id" -> "id6", "data" -> "data6")
    ))

val generateRDDfromdbName = Map("A" -> A, "B" -> B, "C" -> C)

and the wanted output is :

Map(dbName -> A, id -> id8, other data -> some other other data, new data -> data8)
Map(dbName -> A, id -> id1, other data -> some data, new data -> data1)
Map(dbName -> C, id -> id6, other data -> some other data, new data -> data6)

Since nested RDDs are not possible, I would like to find the best way to use as possible as I can for Spark paralellism. I thought about 2 solutions.

First is creating a collection with the contents of the needed db, then convert it to RDD to benefit of RDD scalability (if the collection doesn't fit into driver memory, I could do it in several times). Finally do a join and filter the content on id.

Second is get the RDDs from all needed databases, key them by dbname and id and then do the join.

Here is the scala code :

Solution 1

// Get all needed DB
val dbList = RDDtoDevelop.map(map => map("dbName")).distinct().collect()

// Fill a list with key value pairs as (dbName,db content)
var dbContents = List[(String,Array[Map[String,String]])]()
dbList.foreach(dbName => dbContents = (dbName,generateRDDfromdbName(dbName).collect()) :: dbContents)

// Generate a RDD from this list to benefit to advantages of RDD
val RDDdbs = sc.parallelize(dbContents)

// Key the initial RDD by dbName and join with the contents of dbs
val joinedRDD = RDDtoDevelop.keyBy(map => map("dbName")).join(RDDdbs)

// Check for matched ids between RDD data to develop and dbContents
val result = joinedRDD.map({ case (s,(maptoDeveleop,content)) => maptoDeveleop + ("new data" -> content.find(mapContent => mapContent("id") == maptoDeveleop("id")).get("data"))})

Solution 2

val dbList = RDDtoDevelop.map(map => map("dbName")).distinct().collect()

// Create the list of the database RDDs keyed by (dbName, id)
var dbRDDList = List[RDD[((String,String),Map[String,String])]]()
dbList.foreach(dbName => dbRDDList = generateRDDfromdbName(dbName).keyBy(map => (dbName,map("id"))) :: dbRDDList)

// Create a RDD containing all dbRDD
val RDDdbs = sc.union(dbRDDList)

// Join the initial RDD based on the key with the dbRDDs
val joinedRDD = RDDtoDevelop.keyBy(map => (map("dbName"), map("id"))).join(RDDdbs)

// Reformate the result
val result = joinedRDD.map({ case ((dbName,id),(maptoDevelop,dbmap)) => maptoDevelop + ("new data" -> dbmap("data"))})

Both of them give the wanted output. To my mind, second one seems better since the match of the db and of the id use the paralellism of Spark, but I'm not sure of that. Could you please help me to choose the best, or even better, give me clues for a better solution than mines.

Any other comment is appreciated ( It's my first question on the site ;) ).

Thanks by advance,

Matt

Upvotes: 1

Views: 121

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

I would suggest you to convert your RDDs to dataframes and then joins, distinct and other functions that you would want to apply to the data would be very easy.
Dataframes are distributed and with addition to dataframe apis, sql queries can be used. More information can be found in Spark SQL, DataFrames and Datasets Guide and Introducing DataFrames in Apache Spark for Large Scale Data Science
Moreover your need of foreach and collect functions which makes your code run slow won't be needed.
Example to convert RDDtoDevelop to dataframe is as below

val RDDtoDevelop = sc.parallelize(Array(
  Map("dbName" -> "A", "id" -> "id1", "other data" -> "some data"),
  Map("dbName" -> "C", "id" -> "id6", "other data" -> "some other data"),
  Map("dbName" -> "A", "id" -> "id8", "other data" -> "some other other data")))
  .cache()

Converting the above RDD to dataFrame

val developColumns=RDDtoDevelop.take(1).flatMap(map=>map.keys)

val developDF = RDDtoDevelop.map{value=>
  val list=value.values.toList
  (list(0),list(1),list(2))
}.toDF(developColumns:_*)

And the dataFrame looks as below

+------+---+---------------------+
|dbName|id |other data           |
+------+---+---------------------+
|A     |id1|some data            |
|C     |id6|some other data      |
|A     |id8|some other other data|
+------+---+---------------------+

Coverting your A rdd to dataframe is as below
Source code for A:

val A = sc.parallelize(Array(
  Map("id" -> "id1", "data" -> "data1"),
  Map("id" -> "id8", "data" -> "data8")
))

DataFrame code for A :

val aColumns=A.take(1).flatMap(map=>map.keys)

val aDF = A.map{value =>
  val list=value.values.toList
  (list(0),list(1))
}.toDF(aColumns:_*).withColumn("name", lit("A"))

A new column name is added with database name to have the correct join at the end with developDF.
Output for DataFrame A:

+---+-----+----+
|id |data |name|
+---+-----+----+
|id1|data1|A   |
|id8|data8|A   |
+---+-----+----+

You can convert B and C in similar ways.
Source for B:

val B = sc.parallelize(Array(
  Map("id" -> "id1", "data" -> "data1bis"),
  Map("id" -> "id5", "data" -> "data5")
))

DataFrame for B :

    val bColumns=B.take(1).flatMap(map=>map.keys)

    val bDF = B.map{value =>
      val list=value.values.toList
      (list(0),list(1))
    }.toDF(bColumns:_*).withColumn("name", lit("B"))

Output for B :

+---+--------+----+
|id |data    |name|
+---+--------+----+
|id1|data1bis|B   |
|id5|data5   |B   |
+---+--------+----+

Source for C:

val C = sc.parallelize(Array(
  Map("id" -> "id2", "data" -> "data2"),
  Map("id" -> "id6", "data" -> "data6")
))

DataFrame code for C:

val cColumns=C.take(1).flatMap(map=>map.keys)

val cDF = C.map{value =>
  val list=value.values.toList
  (list(0),list(1))
}.toDF(cColumns:_*).withColumn("name", lit("C"))

Output for C:

+---+-----+----+
|id |data |name|
+---+-----+----+
|id2|data2|C   |
|id6|data6|C   |
+---+-----+----+

After the conversion, A, B and C can be merged using union

var unionDF = aDF.union(bDF).union(cDF)

Which would be

+---+--------+----+
|id |data    |name|
+---+--------+----+
|id1|data1   |A   |
|id8|data8   |A   |
|id1|data1bis|B   |
|id5|data5   |B   |
|id2|data2   |C   |
|id6|data6   |C   |
+---+--------+----+

Then its just joining the developDF and unionDF after renaming of id column of unionDF for dropping it later on.

unionDF = unionDF.withColumnRenamed("id", "id1")
unionDF = developDF.join(unionDF, developDF("id") === unionDF("id1") && developDF("dbName") === unionDF("name"), "left").drop("id1", "name")

Finally we have

+------+---+---------------------+-----+
|dbName|id |other data           |data |
+------+---+---------------------+-----+
|A     |id1|some data            |data1|
|C     |id6|some other data      |data6|
|A     |id8|some other other data|data8|
+------+---+---------------------+-----+

You can do the needful after that.
Note : lit function would work with following import

import org.apache.spark.sql.functions._

Upvotes: 1

Related Questions