Reputation: 1119
I have tried to use lambda expression in spark task, and it throws "java.lang.IllegalArgumentException: Invalid lambda deserialization" exception. This exception is thrown when the is code like "transform(pRDD->pRDD.map(t->t._2))" . The code snippet is below.
JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>>)pRDD-> pRDD.map(
(Function<Tuple2<String,Integer>,Integer>)t->t._2));
JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>> & Serializable)pRDD-> pRDD.map(
(Function<Tuple2<String,Integer>,Integer> & Serializable)t->t._2));
The above two options didn't worked. Where as if I pass below object "f" as the argument instead of lambda expression"t->t_.2". It works.
Function f = new Function<Tuple2<String,Integer>,Integer>(){
@Override
public Integer call(Tuple2<String,Integer> paramT1) throws Exception {
return paramT1._2;
}
};
May I know what is the right format to express that functions as a lambda expression.
public static void main(String[] args) {
Function f = new Function<Tuple2<String,Integer>,Integer>(){
@Override
public Integer call(Tuple2<String,Integer> paramT1) throws Exception {
return paramT1._2;
}
};
JavaStreamingContext ssc = JavaStreamingFactory.getInstance();
JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 9999);
JavaDStream<String> words = lines.flatMap(s->{return Arrays.asList(s.split(" "));});
JavaPairDStream<String,Integer> pairRDD = words.mapToPair(x->new Tuple2<String,Integer>(x,1));
JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>>)pRDD-> pRDD.map(
(Function<Tuple2<String,Integer>,Integer>)t->t._2));
//JavaDStream<Integer> con = aggregate.transform(pRDD-> pRDD.map(f)); It works
con.print();
ssc.start();
ssc.awaitTermination();
}
Upvotes: 7
Views: 3727
Reputation: 11
I had also encountered the similar issue, and the way I solved this issue is to simply create a SerializableFunction
as following
import java.io.Serializable;
import java.util.function.Function;
interface SerializableFunction<T, R> extends Function<T, R>, Serializable {
}
And replace all your Function
s with SerializableFunction
private static final SerializableFunction<Row, Boolean> SAMPLE_FUNCTION = row -> {
final String userId = row.getAs("user_id");
return userId != null;
};
Upvotes: 0
Reputation: 72
You could try boxing the return-value you're trying to send: return new Integer(paramT1._2);
I'm suggesting this because of the source suggesting that int's aren't serializable: http://mindprod.com/jgloss/intvsinteger.html
Upvotes: 0
Reputation: 3147
I think the problem is that lambda functions in Java are really a "class" which implements an interface inside the package java.util.function, for example Interface Function (https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html). I see that this interfaces doesn't extend Serializable... and here is the point...
... when you are using lambda inside a Spark function... Spark is trying to serialize the lambda "class".. and it doesn't implements Serializable.
You could try to force Serializable with something like this:
Runnable r = (Runnable & Serializable)() -> System.out.println("Serializable!");
Upvotes: 0
Reputation: 27456
I don't know why the lambda doesn't work. Perhaps the problem is with a lambda nested inside a lambda. This seems to be recognized by the Spark documentation.
Contrast the example from http://spark.apache.org/docs/latest/programming-guide.html#basics:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
With the example from http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation:
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
@Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
...
}
});
The second example uses a Function
subclass instead of a lambda, presumably because of the same issue that you discovered.
I don't know whether this is useful for you, but the nested lambdas certainly work in Scala. Consider the Scala version of the previous example:
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})
Upvotes: 3