Reputation: 550
Need to perform the following join operation in spark
JavaPairRDD<String, Tuple2<Optional<MarkToMarketPNL>, Optional<MarkToMarketPNL>>> finalMTMPNLRDD = openMTMPNL.fullOuterJoin(closedMTMPNL);
To perform this operation i need two JavaPairRDD which are closedMTMPNL and openMTMPNL. OpenMTM and closeMTM are working fine but keyBy on both RDD are giving error at runtime.
JavaPairRDD<String,MarkToMarketPNL> openMTMPNL = openMTM.keyBy(new Function<MarkToMarketPNL,String>(){
public String call(MarkToMarketPNL mtm) throws Exception
{
return mtm.getTaxlot();
}
});
JavaPairRDD<String,MarkToMarketPNL> closedMTMPNL = closedMTM.keyBy(new Function<MarkToMarketPNL,String>(){
public String call(MarkToMarketPNL mtm) throws Exception
{
return mtm.getTaxlot();
}
});
Is there any other way in which i can join openMTM and closeMTM RDD's? As of now trying to get two RDD's on which the join can be performed on String. What causing the exception to occur??
Attaching the stack trace
java.lang.NullPointerException
15/06/28 01:19:30 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NullPointerException
at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:53)
at scala.collection.IterableLike$class.toIterator(IterableLike.scala:89)
at scala.collection.AbstractIterable.toIterator(Iterable.scala:54)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
15/06/28 01:19:30 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:53)
at scala.collection.IterableLike$class.toIterator(IterableLike.scala:89)
at scala.collection.AbstractIterable.toIterator(Iterable.scala:54)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Upvotes: 1
Views: 1440
Reputation: 362
I had faced the same issue. When join operation is performed internally <key,Iterable<values>> gets created. If one of the Iterable<values> object is null, we see the null pointer exception like above.
Make sure that none of the values are null before performing the join.
Upvotes: 0
Reputation: 69
This exception is due to return a null value from one of your functions. You can return null and after that filter null tuples such as:
JavaPairRDD<String,MarkToMarketPNL> openMTMPNL = openMTM.keyBy(new Function<MarkToMarketPNL,String>(){
public String call(MarkToMarketPNL mtm) throws Exception
{
return mtm.getTaxlot();
}
}).filter(new Function<Tuple2<String, MarkToMarketPNL>, Boolean>() {
@Override
public Boolean call(Tuple2<String, MarkToMarketPNL> arg) throws Exception {
return arg == null ? false : true;
}
});
Upvotes: 1
Reputation: 27455
I think the error is not in the code that you included in the question. Spark is trying to run count
on an RDD. The code you included does not call count
, so that's one sign. But the exception suggests that the RDD being counted has an iterator that was created in Java and is now being converted to a Scala iterator. At that point it turns out this iterator is in fact null
.
Does your code produce an iterator somewhere? Perhaps in a mapPartitions
call or some such?
Upvotes: 0