blue-sky
blue-sky

Reputation: 53916

Different behavior when using Spark REPL and standalone Spark program

When I run this code through Spark REPL :

  val sc = new SparkContext("local[4]" , "")

  val x = sc.parallelize(List( ("a" , "b" , 1) , ("a" , "b" , 1) , ("c" , "b" , 1) , ("a" , "d" , 1)))

  val byKey = x.map({case (sessionId,uri,count) => (sessionId,uri)->count})
  val reducedByKey = byKey.reduceByKey(_ + _ , 2)

  val grouped = byKey.groupByKey
  val count = grouped.map{case ((sessionId,uri),count) => ((sessionId),(uri,count.sum))}
  val grouped2 = count.groupByKey

The REPL displays the type of grouped2 as :

grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] 

However if I use same code within Spark program a different type for grouped2 is returned as shown in this error :

type mismatch;
  found   : org.apache.spark.rdd.RDD[(String, Iterable[(String, Int)])]
  required: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])]
  Note: (String, Iterable[(String, Int)]) >: (String, Seq[(String, Int)]), but class RDD is invariant in type T.
    You may wish to define T as -T instead. (SLS 4.5)
  val grouped2 :  org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey

This is entire code for standalone mode :

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._

object Tester extends App {

  val sc = new SparkContext("local[4]" , "")

  val x = sc.parallelize(List( ("a" , "b" , 1) , ("a" , "b" , 1) , ("c" , "b" , 1) , ("a" , "d" , 1)))

  val byKey = x.map({case (sessionId,uri,count) => (sessionId,uri)->count})
  val reducedByKey = byKey.reduceByKey(_ + _ , 2)

  val grouped = byKey.groupByKey
  val count = grouped.map{case ((sessionId,uri),count) => ((sessionId),(uri,count.sum))}
  val grouped2 : org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey

}

Should the returned types in REPL and Standalone not be equal ?

Update : in Standalone grouped2 is inferred as RDD[(String, Iterable[Nothing])] so val grouped2: RDD[(String, Iterable[Nothing])] = count.groupByKey compiles .

So there are three possible types being returned depending on how program is run ?

Update 2 : IntelliJ seems to inferring the types incorrectly :

val x : org.apache.spark.rdd.RDD[(String, (String, Int))] = sc.parallelize(List( ("a" , ("b" , 1)) , ("a" , ("b" , 1))))

val grouped = x.groupByKey()

IntelliJ infers grouped as org.apache.spark.rdd.RDD[(String, Iterable[Nothing])]

When it should be org.apache.spark.rdd.RDD[(String, Iterable[(String, Int)])] (which is what Spark REPL version 1.0 infers)

Upvotes: 1

Views: 553

Answers (1)

Travis Brown
Travis Brown

Reputation: 139058

For the sake of completeness: the Spark API changed between 0.9 and 1.0 here, and groupByKey now returns a pair with an Iterable as its second member instead of a Seq.

On the IntelliJ issue—it's unfortunately not too hard to confuse IntelliJ's type inference. If it comes up with Nothing it's pretty likely to be wrong.

Upvotes: 1

Related Questions