zyxue
zyxue

Reputation: 8918

spark-shell cannot find the class to be extended

Why cannot I load the file with the following code in spark-shell

import org.apache.spark.sql.types._                                                                                                                                                                                                                                             

import org.apache.spark.sql.Encoder                                                                                                                                                                                                                                             import org.apache.spark.sql.Encoders                                                                                                                                                                                                                                            
import org.apache.spark.sql.expressions.Aggregator                                                                                                                                                                                                                              


case class Data(i: Int)                                                                                                                                                                                                                                                         

val customSummer =  new Aggregator[Data, Int, Int] {                                                                                                                                                                                                                            
  def zero: Int = 0                                                                                                                                                                                                                                                             
  def reduce(b: Int, a: Data): Int = b + a.i                                                                                                                                                                                                                                    
  def merge(b1: Int, b2: Int): Int = b1 + b2                                                                                                                                                                                                                                    
  def finish(r: Int): Int = r                                                                                                                                                                                                                                                   
}.toColumn()   

The error:

<console>:47: error: object creation impossible, since:
it has 2 unimplemented members.
/** As seen from <$anon: org.apache.spark.sql.expressions.Aggregator[Data,Int,Int]>, the missing signatures are as follows.
 *  For convenience, these are usable as stub implementations.
 */
  def bufferEncoder: org.apache.spark.sql.Encoder[Int] = ???
  def outputEncoder: org.apache.spark.sql.Encoder[Int] = ???

       val customSummer =  new Aggregator[Data, Int, Int] {

Update: @user8371915's solution works. But the following script cannot be loaded with a different error. I used :load script.sc in the spark-shell.

import org.apache.spark.sql.expressions.Aggregator
class MyClass extends Aggregator

Error:

loading ./script.sc...
import org.apache.spark.sql.expressions.Aggregator
<console>:11: error: not found: type Aggregator
       class MyClass extends Aggregator

Update(2017-12-03): it doesn't seem to work within Zeppelin, either.

Upvotes: 1

Views: 346

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35249

As per error message you didn't implement bufferEncoder and outputEncoder. Please check API docs for the list of abstract methods that have to be implemented.

These two should suffice:

def bufferEncoder: Encoder[Int] = Encoders.scalaInt
def outputEncoder: Encoder[Int] = Encoders.scalaInt

Upvotes: 1

Related Questions