Reputation: 229
I have a big dataframe and I want to join it with a small csv file. So I broadcast my small file:
val rdd = sc.textFile("hdfs:///user/zed/file/app_desc")
val id_dec = sc.broadcast(rdd.map(line=>(line.split(";")(0),line.split(";")(1))).collectAsMap)
and I make a function to get the id (input) and return the description
def extract_connection_type(input:Integer): String = {
if (input == null || input.length() == 0)
input;
else try {
id_dec.value.get(input)
} catch {
case e: Exception => throw new IOException("UDF:Caught exception processing input row :" + input + e.toString);
}
}
Afterwards, when I create my schema I use this function to make my join
def structure(line: String): structure_Ot = {
val fields = line.split("\\\t",-1);
val Name1 = fields(0);
val Name2 = fields(1);
val Appd = fields(2).toInt;
val App = extract_connection_type(Appd);
val ot_str = new structure_Ot(Name1, Name2, App)
ot_str
}
but I get this error:
<console>:93: error: type mismatch;
found : String
required: Int
and even when I change my input to Integer; my error change to:
found : String
required: Int
What is the reason of this error?
Upvotes: 1
Views: 1042
Reputation: 37852
There's a lot of type mismatches in your code, you'll need to fix them all:
id_dec
has the type Broadcast[Map[String, String]]
(because you create an RDD[(String, String)]
and then call collectAsMap
and broadcast
on the result); In your extract_connection_type
, you call id_dec.value.get(input)
where input
has type Int
, and the map's key is a String
. You can fix that either by changing the type of input
to String
, or by changing id_dec
into a Broadcast[Map[Int, String]]
by collecting and broadcasting an RDD[(Int, String)]
in the first place; If you choose the former, you'll have to adjust your structure
function too, so it passes a String
and not an Int
.
Another issue in using id_dec.value.get(input)
is that get
returns an Option[V]
(where V
is the type of the map's values) and not a V
. You can use the apply
method (implicitly) instead: id_dec.value(input)
, which will return a String
or throw an exception if no matching key found
Altogether, this is a compiling version of your code:
val rdd = sc.textFile("hdfs:///user/zed/file/app_desc")
val id_dec: Broadcast[Map[String, String]] = sc.broadcast(rdd.map(line=>(line.split(";")(0),line.split(";")(1))).collectAsMap)
def extract_connection_type(input: String): String = {
if (input == null || input.length() == 0)
input
else try {
id_dec.value(input)
} catch {
case e: Exception => throw new IOException("UDF:Caught exception processing input row :" + input + e.toString);
}
}
def structure(line: String): structure_Ot = {
val fields = line.split("\\\t",-1)
val Name1 = fields(0)
val Name2 = fields(1)
val Appd = fields(2)
val App = extract_connection_type(Appd)
val ot_str = structure_Ot(Name1, Name2, App.toInt)
ot_str
}
A good way to resolve these issues yourself is to explicitly type each value you define; That way you'll see where the error is more clearly - if you expect id_dec
to have a certain type, the error will point to its assignment if there's an issue where the wrong type is assigned.
Upvotes: 2