Kevin
Kevin

Reputation: 91

Union a List of Flume Receivers in Spark Streaming

In order to increase parallelism as recommended in the Spark Streaming Programming guide I'm setting up multiple receivers and trying to union a list of them. This code works as expected:

    private JavaDStream<SparkFlumeEvent> getEventsWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) {

        List<JavaReceiverInputDStream<SparkFlumeEvent>> receivers = new ArrayList<>();

        for (String host : hosts) {
            for (String port : ports) {
                receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port)));
            }
        }

        JavaDStream<SparkFlumeEvent> unionStreams = receivers.get(0)
                .union(receivers.get(1))
                .union(receivers.get(2))
                .union(receivers.get(3))
                .union(receivers.get(4))
                .union(receivers.get(5));

        return unionStreams;
    }

But I don't actually know how many receivers my cluster will have until runtime. When I try to do this in a loop I get an NPE.

 private JavaDStream<SparkFlumeEvent> getEventsNotWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) {

        List<JavaReceiverInputDStream<SparkFlumeEvent>> receivers = new ArrayList<>();

        for (String host : hosts) {
            for (String port : ports) {
                receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port)));
            }
        }

        JavaDStream<SparkFlumeEvent> unionStreams = null;
        for (JavaReceiverInputDStream<SparkFlumeEvent> receiver : receivers) {
            if (unionStreams == null) {
                unionStreams = receiver;
            } else {
                unionStreams.union(receiver);
            }
        }

        return unionStreams;
    }

ERROR:

16/09/15 17:05:25 ERROR JobScheduler: Error in job generator java.lang.NullPointerException at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:270) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 16/09/15 17:05:25 INFO MemoryStore: ensureFreeSpace(15128) called with curMem=520144, maxMem=555755765 16/09/15 17:05:25 INFO MemoryStore: Block broadcast_24 stored as values in memory (estimated size 14.8 KB, free 529.5 MB) Exception in thread "main" java.lang.NullPointerException at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172) at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68) at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225) at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172) at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:270) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

What's the correct way to do this?

Upvotes: 0

Views: 379

Answers (1)

Hokam
Hokam

Reputation: 924

Can you please try out the below code, It would solve your problem:

private JavaDStream<SparkFlumeEvent> getEventsNotWorking(JavaStreamingContext jssc, List<String> hosts, List<String> ports) {

    List<JavaDStream<SparkFlumeEvent>> receivers = new ArrayList<JavaDStream<SparkFlumeEvent>>();

    for (String host : hosts) {
        for (String port : ports) {
            receivers.add(FlumeUtils.createStream(jssc, host, Integer.parseInt(port)));
        }
    }

    return jssc.union(receivers.get(0), receivers.subList(1, receivers.size()));;
}

Upvotes: 0

Related Questions