Ideas frontier
Ideas frontier

Reputation: 70

Can Flink Map be Called on and When Required (Not activated on input Stream)

I have a map in flink that gets activated once data comes through a stream.

I want to call that map even if no data come through.

I moved the map into a function (infinite function call) but then the flink job never runs. And if I add it within a map it will only get activated if and when data comes through.

The Idea is, have 1 map in an infinte loop, checking some shared variable and another flink stream monitoring a kafka queue, if data comes in it process it changes a shared variable that effects the infinite loop in some way and continues.

How Do I call an infinite loop map and run the flink maps together?

I tried creating a CollectionMap with random data to activate the stream and map to call the infinite loop, but exits almost immediately even though there is a while(true) condition within the map

In the IDE it works, when I push it to Flink.local it exits almost immediately not staying in loop

Stream 1

    val data_stream = env.addSource(myConsumer)
      .map(x => {process(x)})

Stream 2

    val elements = List[String]("Start")
    var read = env.fromElements(elements).map(x => ProcessData.infinteLoop())

How Do I call an infinite loop map and run the flink maps together?

Upvotes: 1

Views: 483

Answers (1)

You can create a window and a trigger and call the map every x seconds.

You can find the documentation heare: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html

Example:

import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow

 val data_stream = env.addSource(myConsumer)
  .map(x => {process(x)})

val window: DataStream[String] = data_stream
  .windowAll(GlobalWindows.create())
  .trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](5)))
  .apply((w: GlobalWindow, x: Iterable[(Integer, String)], y: Collector[String]) => {})

Upvotes: 1

Related Questions