lightweight
lightweight

Reputation: 3327

removing the some in left join RDD in spark

I'm running a left join in a Spark RDD but sometimes I get an output like this:

(k, (v, Some(w)))

or

(k, (v, None))

how do I make it so it give me back just

(k, (v, (w)))

or

(k, (v, ()))

here is how I'm combining 2 files..

def formatMap3(
    left: String = "", right: String = "")(m: String = "") = {
  val items = m.map{k => {
   s"$k"}}
  s"$left$items$right"
}



val combPrdGrp = custPrdGrp3.leftOuterJoin(cmpgnPrdGrp3)

val combPrdGrp2 = combPrdGrp.groupByKey

val combPrdGrp3 = combPrdGrp2.map { case (n, list) => 
  val formattedPairs = list.map { case (a, b) => s"$a $b" }
  s"$n ${formattedPairs.mkString}"
}

Upvotes: 2

Views: 3085

Answers (2)

Jason Scott Lenderman
Jason Scott Lenderman

Reputation: 1918

If you're just interesting in getting formatted output without the Somes/Nones, then something like this should work:

val combPrdGrp3 = combPrdGrp2.map { case (n, list) => 
  val formattedPairs = list.map { 
    case (a, Some(b)) => s"$a $b" 
    case (a, None) => s"$a, ()" 
  }
  s"$n ${formattedPairs.mkString}"
}

If you have other uses in mind then you probably need to provide more details.

Upvotes: 1

Rohan Aletty
Rohan Aletty

Reputation: 2432

The leftOuterJoin() function in Spark returns the tuples containing the join key, the left set's value and an Option of the right set's value. To extract from the Option class, simply call getOrElse() on the right set's value in the resultant RDD. As an example:

scala> val rdd1 = sc.parallelize(Array(("k1", 4), ("k4", 7), ("k8", 10), ("k6", 1), ("k7", 4)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:21

scala> val rdd2 = sc.parallelize(Array(("k5", 4), ("k4", 3), ("k0", 2), ("k6", 5), ("k1", 6)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[14] at parallelize at <console>:21

scala> val rdd_join = rdd1.leftOuterJoin(rdd2).map { case (a, (b, c: Option[Int])) => (a, (b, (c.getOrElse()))) }
rdd_join: org.apache.spark.rdd.RDD[(String, (Int, AnyVal))] = MapPartitionsRDD[18] at map at <console>:25'

scala> rdd_join.take(5).foreach(println)
...
(k4,(7,3))
(k6,(1,5))
(k7,(4,()))
(k8,(10,()))
(k1,(4,6))

Upvotes: 1

Related Questions