Reputation: 13
I'm in the need of merging two different streams RDD.
Uno of the streams type is org.apache.spark.streaming.dstream.DStream[String], and the other one is of type org.apache.spark.streaming.dstream.DStream[twitter4j.Status].
I've tried:
val streamRDD = stream.union(sentiments)
But it won't succeed:
[error] found : org.apache.spark.streaming.dstream.DStream[String]
[error] required: org.apache.spark.streaming.dstream.DStream[twitter4j.Status]
[error] val streamRDD = stream.union(sentiments)
[error] ^
Upvotes: 1
Views: 958
Reputation: 2869
The problem is that union
only works on two DStream
of the same element type, while you have DStream[String]
and DStream[twitter4j.Status]
and String
is not twitter4j.Status
.
I assume you have the following types:
val stream: DStream[twitter4j.Status]
val sentiments: DStream[String]
You have different choices to fix this:
You are sure that String
and twitter4j.Status
should be mixed into one DStream
because they represent the same information in your context: convert either DStream
to match the other
a) convert stream
to match sentiments
, so you need a conversion twitter4j.Status => String
, possibly you can use _.toString
like this:
val stream2 = stream.map(_.toString)
val result = stream2.union(sentiments)
sentiments
to match stream
, requiring String => twitter4j.Status
.String
and twitter4j.Status
are two different things in your context, you want to keep the distinction between both, but still combine them into one DStream
In general you can use a Sum
-type to represent each case, here we only have two so we can use the predefined Either
:
type R = DStream[Either[String,twitter4j.Status] // shorter
val streamL: R = stream.map(Left(_))
val sentimentR: R = sentiments.map(Right(_))
val result: R = streamL.union(sentimentsR)
At the end you will have one stream, where each element is either a String
wrapped in a Left
or a twitter4j.Status
wrapped in a Right
, allowing you to distinguish between the two when processing the stream.
Upvotes: 2