Reputation: 231
I have three incoming streams from Kafka. I parse the streams received as JSON and extract them to appropriate case classes and form DStreams of the following schema:
case class Class1(incident_id: String,
crt_object_id: String,
source: String,
order_number: String)
case class Class2(crt_object_id: String,
hangup_cause: String)
case class Class3(crt_object_id: String,
text: String)
I want to join these three DStreams based on the common column i.e. crt_object_id
. The desired DStream should be of the form:
case class Merged(incident_id: String,
crt_object_id: String,
source: String,
order_number: String,
hangup_cause: String,
text: String)
Please tell me a way to do the same. I'm very new to both Spark and Scala.
Upvotes: 1
Views: 1038
Reputation: 13913
The Spark Streaming documentation tells you the signature of the join
method:
join(otherStream, [numTasks])
When called on two
DStream
s of(K, V)
and(K, W)
pairs, return a newDStream
of(K, (V, W))
pairs with all pairs of elements for each key.
Notice that you need DStream
s of key-value pairs rather than case classes. So you will have to extract the field you want to join on from your case classes, join the streams and pack the resulting stream into the appropriate case class.
case class Class1(incident_id: String, crt_object_id: String,
source: String, order_number: String)
case class Class2(crt_object_id: String, hangup_cause: String)
case class Class3(crt_object_id: String, text: String)
case class Merged(incident_id: String, crt_object_id: String,
source: String, order_number: String,
hangup_cause: String, text: String)
val stream1: DStream[Class1] = ...
val stream2: DStream[Class2] = ...
val stream3: DStream[Class3] = ...
val transformedStream1: DStream[(String, Class1)] = stream1.map {
c1 => (c1.crt_object_id, c1)
}
val transformedStream2: DStream[(String, Class2)] = stream2.map {
c2 => (c2.crt_object_id, c2)
}
val transformedStream3: DStream[(String, Class3)] = stream3.map {
c3 => (c3.crt_object_id, c3)
}
val joined: DStream[(String, ((Class1, Class2), Class3))] =
transformedStream1.join(transformedStream2).join(transformedStream3)
val merged: DStream[Merged] = joined.map {
case (crt_object_id, ((c1, c2), c3)) =>
Merged(c1.incident_id, crt_object_id, c1.source,
c1.order_number, c2.hangup_cause, c3.text)
}
Upvotes: 2