ScalaNewbie
ScalaNewbie

Reputation: 203

Apache Flink: Merge two DataStreams with a CoFlatMapFunction

I'm trying to merge two streams of the same data type. I saw the CoFlatMapfunction and tried it out, but I'm getting the following error:

"unspecified value parameters".

The code is written in Scala

val eventsTypeOne: DataStream[Option[Event]] =  patternStream1.select(pattern => selectFn1(pattern.toMap))
val eventsTypeTwo: DataStream[Option[Event]] = patternStream2.select(pattern => selectFn2(pattern.toMap))

eventsTypeOne.connect(eventsTypeTwo).flatMap(new CoFlatMapFunction[Option[Event], Option[Event], Option[Event]] {
  override def flatMap1(eventTypeOne: Option[Event], out: Collector[Option[Event]]): Unit = {
    out.collect(eventTypeOne)
  }

  override def flatMap2(eventTypeTwo: Option[Event], out: Collector[Option[Event]]): Unit = {
    out.collect(eventTypeTwo)
  }
})
}

How can I use the CoFlatMapFunction correctly? Or is there a more elegant way to merge two data streams?

Thanks in advance!

Upvotes: 3

Views: 900

Answers (1)

twalthr
twalthr

Reputation: 2664

You can achieve the same functionality by using a union() operator. Union allows to merge two streams of the same type.

Upvotes: 4

Related Questions