Reputation: 993
I am trying to use ConstantInputDStream in java spark streaming program, but not able to do so. I am doing:
final SparkConf sparkConf2 = new SparkConf().setAppName("NetworkWordCount");
final JavaStreamingContext ssc2 = new JavaStreamingContext(sparkConf2, new Duration(10000));
final List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = ssc2.sparkContext().parallelize(data);
final JavaDStream<Integer> numStream = new JavaDStream<Integer>(new ConstantInputDStream<Integer>(ssc2, distData));
But the final statement is giving compilation error: "error: constructor ConstantInputDStream in class ConstantInputDStream<'T'> cannot be applied to given types;"
What could be the problem and how to fix that?
Upvotes: 1
Views: 402
Reputation: 7790
I think the API may have changed. I took your code as my starting place and added Vishnu's answer and ended up with this:
try (JavaStreamingContext streamCtxt = new JavaStreamingContext(sparkContext, new Duration(1000))) {
final List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = streamCtxt.sparkContext().parallelize(data);
ClassTag<Integer> evidence = ClassTag$.MODULE$.apply(Integer.class);
ConstantInputDStream<Integer> integerConstantInputDStream =
new ConstantInputDStream<>(streamCtxt.ssc(), distData.rdd(), evidence);
ArrayList<Integer> list = new ArrayList();
final JavaDStream<Integer> javaDStream = JavaDStream.fromDStream(integerConstantInputDStream, evidence);
javaDStream.foreachRDD(r -> list.addAll(r.collect()));
streamCtxt.start();
streamCtxt.awaitTerminationOrTimeout(2000);
streamCtxt.stop();
log.info("here is the list: " + list.stream().map(j->String.valueOf(j)).collect(Collectors.joining(",")));
}
The output is:
here is the list: 1,2,3,4,5,1,2,3,4,5
Upvotes: 0
Reputation: 735
You need to add
ClassTag<Integer> classTag = ClassTag$.MODULE$.apply(Integer.class);
final JavaDStream<Integer> numStream = new JavaDStream<Integer>(new ConstantInputDStream<Integer>(ssc2, distData,classTag));
Upvotes: 1