awenclaw
awenclaw

Reputation: 533

Transform input stream to key-values pairs stream

I am new to Spark and Scala so my question probably is rather easy but I still struggle to find an answer. I need to join two Spark streams but I have problems with converting those streams to appropriate format. Please see my code below:

val lines7 = ssc.socketTextStream("localhost", 9997)
val pairs7 = lines7.map(line => (line.split(" ")[0], line))

val lines8 = ssc.socketTextStream("localhost", 9998)
val pairs8 = lines8.map(line => (line.split(" ")[0], line))

val newStream = pairs7.join(pairs8)

This doesn't work because "join" function expects streams in format DStream[String, String] and result of map function is DStream[(String, String)] .

And now my question is how to code this map function to get appropriate output (little explanation would be also great)?

Thanks in advance.

Upvotes: 0

Views: 983

Answers (1)

maasg
maasg

Reputation: 37435

This works as expected:

import org.apache.spark.streaming.{Seconds, StreamingContext}

val ssc = new StreamingContext(sc, Seconds(30))
val lines7 = ssc.socketTextStream("localhost", 9997)
val pairs7 = lines7.map(line => (line.split(" ")(0), line))
val lines8 = ssc.socketTextStream("localhost", 9998)
val pairs8 = lines8.map(line => (line.split(" ")(0), line))
val newStream = pairs7.join(pairs8)

newStream.foreachRDD(rdd => println(rdd.collect.map(_.toString).mkString(",")))

ssc.start

The only issue I see is a syntax error on: line.split(" ")[0] vs line.split(" ")(0) but I guess that would be noticed by the compiler.

Upvotes: 1

Related Questions