Smallthing
Smallthing

Reputation: 1

How do I iterate all elements and the members of an element throughout a Spark RDD?

Totally fresh on Scala and Spark! Got a series of (String, (Double, Double, Int)) data as k-v array. Now the groupByKey() method on this data so to get several (String, Seq[(Double, Double, Int)]) groups. How can I get into the first big group and then fetch through the Seq part and then move on to the next big group? Say, maybe I get

("id1", [(1.1,2.2,3), (4.4,5.5,6), (7.7,8.8,9)]),
("id2", [(10.10,11.11,12), (13.13,14.14,15)]) 

in my memory. I'm gonna dig into "id1" to see the three datas iteratively and even make some changes. Then I jump into "id2"and iterate the datas too. How do I code? >_<

Upvotes: 0

Views: 630

Answers (1)

Stephen Carman
Stephen Carman

Reputation: 997

If it's an RDD of (String, Seq[(Double, Double, Int)]) you could iterate over it with a standard map.

val data: RDD[(String, Seq[(Double, Double, Int)])] = _ //Your RDD Here
data.map {
  case (key, value) => 
    value.map {
      case (first, second, third) => first * second * third
    }
  }

I would consider a Dataframe or structuring your data in some other fashion as this is probably a pretty unwieldy way to structure your data.

You can find some information about dataframes/datasets here http://spark.apache.org/docs/latest/sql-programming-guide.html which might be better suited to your problem and would let you write more SQL like statements rather than maps if you are not comfortable with them.

Here is a complete and kind of dirty example

import org.apache.spark._

object SparkExample extends App {

  val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("App")
  val sess: SparkContext = new SparkContext(conf)

  val data: Seq[(String, Seq[(Double, Double, Int)])] = Seq[(String, Seq[(Double, Double, Int)])](
    ("id1", Seq[(Double, Double, Int)]((1.1, 2.2, 3), (4.4, 5.5, 6), (7.7, 8.8, 9))),
    ("id2", Seq[(Double, Double, Int)]((10.10, 11.11, 12), (13.13, 14.14, 15)))
  )

  val rdd: RDD[(String, Seq[(Double, Double, Int)])] = sess.parallelize(data)

  val d: Array[Seq[Double]] = rdd.map {
    case (key, value) => value.map {
      case (first, second, third) => first + second + third
    }
  }.collect()
  println(d.mkString(", "))
}

Upvotes: 3

Related Questions