Reputation: 21
I'm new to spark,now I want to transform two streams together,such as JavaNetworkWordCount
example,I receive two different streams :
JavaStreamingContext jssc = new JavaStreamingContext("local[2]", "JavaNetworkWordCount",new Duration(1000)); JavaReceiverInputDStream<String> lines1 = jssc.socketTextStream(ip1, port1); JavaReceiverInputDStream<String> lines2 = jssc.socketTextStream(ip2, port2); //can I union them like this in one driver program: JavaDStream<String> words = lines1.union(lines2); words = lines.flatMap( new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); } }); </code>
then do other transforms and action.I tested it and failed. I had read spark documentation, can't find an example.
Upvotes: 1
Views: 4563
Reputation: 1530
here's an example from the new Kinesis WordCount example:
the idea is to create a list of the streams, then call ssc.union(list). the scala version is a bit cleaner, but the idea is the same for both.
Upvotes: 3