Reputation: 533
I am new to Spark and Scala so my question probably is rather easy but I still struggle to find an answer. I need to join two Spark streams but I have problems with converting those streams to appropriate format. Please see my code below:
val lines7 = ssc.socketTextStream("localhost", 9997)
val pairs7 = lines7.map(line => (line.split(" ")[0], line))
val lines8 = ssc.socketTextStream("localhost", 9998)
val pairs8 = lines8.map(line => (line.split(" ")[0], line))
val newStream = pairs7.join(pairs8)
This doesn't work because "join" function expects streams in format DStream[String, String]
and result of map function is DStream[(String, String)]
.
And now my question is how to code this map function to get appropriate output (little explanation would be also great)?
Thanks in advance.
Upvotes: 0
Views: 983
Reputation: 37435
This works as expected:
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(30))
val lines7 = ssc.socketTextStream("localhost", 9997)
val pairs7 = lines7.map(line => (line.split(" ")(0), line))
val lines8 = ssc.socketTextStream("localhost", 9998)
val pairs8 = lines8.map(line => (line.split(" ")(0), line))
val newStream = pairs7.join(pairs8)
newStream.foreachRDD(rdd => println(rdd.collect.map(_.toString).mkString(",")))
ssc.start
The only issue I see is a syntax error on: line.split(" ")[0]
vs line.split(" ")(0)
but I guess that would be noticed by the compiler.
Upvotes: 1