107
107

Reputation: 550

need help in joining spark RDD's in java

Need to perform the following join operation in spark

JavaPairRDD<String, Tuple2<Optional<MarkToMarketPNL>, Optional<MarkToMarketPNL>>> finalMTMPNLRDD = openMTMPNL.fullOuterJoin(closedMTMPNL);

To perform this operation i need two JavaPairRDD which are closedMTMPNL and openMTMPNL. OpenMTM and closeMTM are working fine but keyBy on both RDD are giving error at runtime.

JavaPairRDD<String,MarkToMarketPNL> openMTMPNL = openMTM.keyBy(new Function<MarkToMarketPNL,String>(){
                public String call(MarkToMarketPNL mtm) throws Exception
                {
                        return mtm.getTaxlot();
                }
            }); 

JavaPairRDD<String,MarkToMarketPNL> closedMTMPNL = closedMTM.keyBy(new Function<MarkToMarketPNL,String>(){
                    public String call(MarkToMarketPNL mtm) throws Exception
                    {
                        return mtm.getTaxlot();
                    }
                }); 

Is there any other way in which i can join openMTM and closeMTM RDD's? As of now trying to get two RDD's on which the join can be performed on String. What causing the exception to occur??

Attaching the stack trace

java.lang.NullPointerException
15/06/28 01:19:30 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NullPointerException
    at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:53)
    at scala.collection.IterableLike$class.toIterator(IterableLike.scala:89)
    at scala.collection.AbstractIterable.toIterator(Iterable.scala:54)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
15/06/28 01:19:30 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
    at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:53)
    at scala.collection.IterableLike$class.toIterator(IterableLike.scala:89)
    at scala.collection.AbstractIterable.toIterator(Iterable.scala:54)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Upvotes: 1

Views: 1440

Answers (3)

Abhiram
Abhiram

Reputation: 362

I had faced the same issue. When join operation is performed internally <key,Iterable<values>> gets created. If one of the Iterable<values> object is null, we see the null pointer exception like above.

Make sure that none of the values are null before performing the join.

Upvotes: 0

Malemi
Malemi

Reputation: 69

This exception is due to return a null value from one of your functions. You can return null and after that filter null tuples such as:

JavaPairRDD<String,MarkToMarketPNL> openMTMPNL = openMTM.keyBy(new Function<MarkToMarketPNL,String>(){
            public String call(MarkToMarketPNL mtm) throws Exception
            {
                    return mtm.getTaxlot();
            }
        }).filter(new Function<Tuple2<String, MarkToMarketPNL>, Boolean>() {

        @Override
        public Boolean call(Tuple2<String, MarkToMarketPNL> arg) throws Exception {
            return arg == null ? false : true;
       }
    }); 

Upvotes: 1

Daniel Darabos
Daniel Darabos

Reputation: 27455

I think the error is not in the code that you included in the question. Spark is trying to run count on an RDD. The code you included does not call count, so that's one sign. But the exception suggests that the RDD being counted has an iterator that was created in Java and is now being converted to a Scala iterator. At that point it turns out this iterator is in fact null.

Does your code produce an iterator somewhere? Perhaps in a mapPartitions call or some such?

Upvotes: 0

Related Questions