Jagrati Gogia
Jagrati Gogia

Reputation: 231

How to merge multiple DStreams in spark using scala?

I have three incoming streams from Kafka. I parse the streams received as JSON and extract them to appropriate case classes and form DStreams of the following schema:

case class Class1(incident_id: String,
                  crt_object_id: String,
                  source: String,
                  order_number: String)

case class Class2(crt_object_id: String,
                  hangup_cause: String)

case class Class3(crt_object_id: String,
                  text: String)

I want to join these three DStreams based on the common column i.e. crt_object_id. The desired DStream should be of the form:

case class Merged(incident_id: String,
                  crt_object_id: String,
                  source: String,
                  order_number: String,
                  hangup_cause: String,
                  text: String)

Please tell me a way to do the same. I'm very new to both Spark and Scala.

Upvotes: 1

Views: 1038

Answers (1)

Alicia Garcia-Raboso
Alicia Garcia-Raboso

Reputation: 13913

The Spark Streaming documentation tells you the signature of the join method:

join(otherStream, [numTasks])

When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.

Notice that you need DStreams of key-value pairs rather than case classes. So you will have to extract the field you want to join on from your case classes, join the streams and pack the resulting stream into the appropriate case class.

case class Class1(incident_id: String, crt_object_id: String,
                  source: String, order_number: String)
case class Class2(crt_object_id: String, hangup_cause: String)
case class Class3(crt_object_id: String, text: String)
case class Merged(incident_id: String, crt_object_id: String,
                  source: String, order_number: String,
                  hangup_cause: String, text: String)

val stream1: DStream[Class1] = ...
val stream2: DStream[Class2] = ...
val stream3: DStream[Class3] = ...

val transformedStream1: DStream[(String, Class1)] = stream1.map {
    c1 => (c1.crt_object_id, c1)
}
val transformedStream2: DStream[(String, Class2)] = stream2.map {
    c2 => (c2.crt_object_id, c2)
}
val transformedStream3: DStream[(String, Class3)] = stream3.map {
    c3 => (c3.crt_object_id, c3)
}

val joined: DStream[(String, ((Class1, Class2), Class3))] =
    transformedStream1.join(transformedStream2).join(transformedStream3)

val merged: DStream[Merged] = joined.map {
    case (crt_object_id, ((c1, c2), c3)) =>
        Merged(c1.incident_id, crt_object_id, c1.source,
               c1.order_number, c2.hangup_cause, c3.text)

}

Upvotes: 2

Related Questions