R. Mesa
R. Mesa

Reputation: 13

How to merge two different type streams RDDs

I'm in the need of merging two different streams RDD.

Uno of the streams type is org.apache.spark.streaming.dstream.DStream[String], and the other one is of type org.apache.spark.streaming.dstream.DStream[twitter4j.Status].

I've tried:

  val streamRDD = stream.union(sentiments)

But it won't succeed:

[error]  found   : org.apache.spark.streaming.dstream.DStream[String]
[error]  required: org.apache.spark.streaming.dstream.DStream[twitter4j.Status]
[error]       val streamRDD = stream.union(sentiments)
[error]                                    ^

Upvotes: 1

Views: 958

Answers (1)

Markus1189
Markus1189

Reputation: 2869

The problem is that union only works on two DStream of the same element type, while you have DStream[String] and DStream[twitter4j.Status] and String is not twitter4j.Status.

I assume you have the following types:

val stream: DStream[twitter4j.Status]
val sentiments: DStream[String]

You have different choices to fix this:

    1. You are sure that String and twitter4j.Status should be mixed into one DStream because they represent the same information in your context: convert either DStream to match the other

      • a) convert stream to match sentiments, so you need a conversion twitter4j.Status => String, possibly you can use _.toString like this:

        val stream2 = stream.map(_.toString)
        val result = stream2.union(sentiments)
        
      • b) convert sentiments to match stream, requiring String => twitter4j.Status.
    1. String and twitter4j.Status are two different things in your context, you want to keep the distinction between both, but still combine them into one DStream

    In general you can use a Sum-type to represent each case, here we only have two so we can use the predefined Either:

    type R = DStream[Either[String,twitter4j.Status] // shorter
    val streamL: R = stream.map(Left(_))
    val sentimentR: R = sentiments.map(Right(_))
    val result: R = streamL.union(sentimentsR)
    

    At the end you will have one stream, where each element is either a String wrapped in a Left or a twitter4j.Status wrapped in a Right, allowing you to distinguish between the two when processing the stream.

Upvotes: 2

Related Questions