Reputation: 70
I have a map in flink that gets activated once data comes through a stream.
I want to call that map even if no data come through.
I moved the map into a function (infinite function call) but then the flink job never runs. And if I add it within a map it will only get activated if and when data comes through.
The Idea is, have 1 map in an infinte loop, checking some shared variable and another flink stream monitoring a kafka queue, if data comes in it process it changes a shared variable that effects the infinite loop in some way and continues.
How Do I call an infinite loop map and run the flink maps together?
I tried creating a CollectionMap with random data to activate the stream and map to call the infinite loop, but exits almost immediately even though there is a while(true) condition within the map
In the IDE it works, when I push it to Flink.local it exits almost immediately not staying in loop
Stream 1
val data_stream = env.addSource(myConsumer)
.map(x => {process(x)})
Stream 2
val elements = List[String]("Start")
var read = env.fromElements(elements).map(x => ProcessData.infinteLoop())
How Do I call an infinite loop map and run the flink maps together?
Upvotes: 1
Views: 483
Reputation: 710
You can create a window
and a trigger
and call the map every x
seconds.
You can find the documentation heare: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
Example:
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
val data_stream = env.addSource(myConsumer)
.map(x => {process(x)})
val window: DataStream[String] = data_stream
.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
.apply((w: GlobalWindow, x: Iterable[(Integer, String)], y: Collector[String]) => {})
Upvotes: 1