Reputation: 203
I'm trying to merge two streams of the same data type. I saw the CoFlatMapfunction
and tried it out, but I'm getting the following error:
"unspecified value parameters".
The code is written in Scala
val eventsTypeOne: DataStream[Option[Event]] = patternStream1.select(pattern => selectFn1(pattern.toMap))
val eventsTypeTwo: DataStream[Option[Event]] = patternStream2.select(pattern => selectFn2(pattern.toMap))
eventsTypeOne.connect(eventsTypeTwo).flatMap(new CoFlatMapFunction[Option[Event], Option[Event], Option[Event]] {
override def flatMap1(eventTypeOne: Option[Event], out: Collector[Option[Event]]): Unit = {
out.collect(eventTypeOne)
}
override def flatMap2(eventTypeTwo: Option[Event], out: Collector[Option[Event]]): Unit = {
out.collect(eventTypeTwo)
}
})
}
How can I use the CoFlatMapFunction
correctly? Or is there a more elegant way to merge two data streams?
Thanks in advance!
Upvotes: 3
Views: 900
Reputation: 2664
You can achieve the same functionality by using a union()
operator. Union allows to merge two streams of the same type.
Upvotes: 4