董鑫功
董鑫功

Reputation: 21

Spark streaming,how to merge different streams into one stream

I'm new to spark,now I want to transform two streams together,such as JavaNetworkWordCount example,I receive two different streams :

JavaStreamingContext jssc = new JavaStreamingContext("local[2]", "JavaNetworkWordCount",new Duration(1000));
JavaReceiverInputDStream<String> lines1 = jssc.socketTextStream(ip1, port1);
JavaReceiverInputDStream<String> lines2 = jssc.socketTextStream(ip2, port2);
//can I union them like this in one driver program:
JavaDStream<String> words = lines1.union(lines2);
words = lines.flatMap(
  new FlatMapFunction<String, String>() {
    @Override public Iterable<String> call(String x) {
      return Arrays.asList(x.split(" "));
    }
  });
</code>

then do other transforms and action.I tested it and failed. I had read spark documentation, can't find an example.

Upvotes: 1

Views: 4563

Answers (1)

Chris Fregly
Chris Fregly

Reputation: 1530

here's an example from the new Kinesis WordCount example:

Java version: https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java#L130

Scala version: https://github.com/apache/spark/blob/ae58aea2d1435b5bb011e68127e1bcddc2edf5b2/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L116

the idea is to create a list of the streams, then call ssc.union(list). the scala version is a bit cleaner, but the idea is the same for both.

Upvotes: 3

Related Questions