Zied Hermi
Zied Hermi

Reputation: 229

spark join function error

I have a big dataframe and I want to join it with a small csv file. So I broadcast my small file:

val rdd = sc.textFile("hdfs:///user/zed/file/app_desc")
val id_dec = sc.broadcast(rdd.map(line=>(line.split(";")(0),line.split(";")(1))).collectAsMap) 

and I make a function to get the id (input) and return the description

def extract_connection_type(input:Integer): String = {
  if (input == null || input.length() == 0)
    input;
  else try {
    id_dec.value.get(input)
  } catch {
     case e: Exception => throw new IOException("UDF:Caught exception processing input row :" + input + e.toString);
  }
}

Afterwards, when I create my schema I use this function to make my join

def structure(line: String): structure_Ot = {
  val fields = line.split("\\\t",-1);
  val Name1 = fields(0);
  val Name2 = fields(1);
  val Appd = fields(2).toInt;
  val App = extract_connection_type(Appd);
  val ot_str = new structure_Ot(Name1, Name2, App)
  ot_str
}

but I get this error:

<console>:93: error: type mismatch;
 found   : String
 required: Int

and even when I change my input to Integer; my error change to:

found   : String
required: Int 

What is the reason of this error?

Upvotes: 1

Views: 1042

Answers (1)

Tzach Zohar
Tzach Zohar

Reputation: 37852

There's a lot of type mismatches in your code, you'll need to fix them all:

  1. id_dec has the type Broadcast[Map[String, String]] (because you create an RDD[(String, String)] and then call collectAsMap and broadcast on the result); In your extract_connection_type, you call id_dec.value.get(input) where input has type Int, and the map's key is a String. You can fix that either by changing the type of input to String, or by changing id_dec into a Broadcast[Map[Int, String]] by collecting and broadcasting an RDD[(Int, String)] in the first place; If you choose the former, you'll have to adjust your structure function too, so it passes a String and not an Int.

  2. Another issue in using id_dec.value.get(input) is that get returns an Option[V] (where V is the type of the map's values) and not a V. You can use the apply method (implicitly) instead: id_dec.value(input), which will return a String or throw an exception if no matching key found

Altogether, this is a compiling version of your code:

val rdd = sc.textFile("hdfs:///user/zed/file/app_desc")
val id_dec: Broadcast[Map[String, String]] = sc.broadcast(rdd.map(line=>(line.split(";")(0),line.split(";")(1))).collectAsMap)

def extract_connection_type(input: String): String = {
  if (input == null || input.length() == 0)
    input
  else try {
    id_dec.value(input)
  } catch {
    case e: Exception => throw new IOException("UDF:Caught exception processing input row :" + input + e.toString);
  }
}

def structure(line: String): structure_Ot = {
  val fields = line.split("\\\t",-1)
  val Name1 = fields(0)
  val Name2 = fields(1)
  val Appd = fields(2)
  val App = extract_connection_type(Appd)
  val ot_str = structure_Ot(Name1, Name2, App.toInt)
  ot_str
}

A good way to resolve these issues yourself is to explicitly type each value you define; That way you'll see where the error is more clearly - if you expect id_dec to have a certain type, the error will point to its assignment if there's an issue where the wrong type is assigned.

Upvotes: 2

Related Questions