Pfav
Pfav

Reputation: 23

Useful Debugging in Flink

I am working on a new Flink streaming application, and cannot get debugging to step through the more critical sections of my code.

Here is my main program (with some parts removed):

def main(args: Array[String]) {

val env = StreamExecutionEnvironment.createLocalEnvironment()
env.setStateBackend(new RocksDBStateBackend(statePath))


env.addSource(new KafkaConsumer().getKafkaKeyedConsumer(inTopic, inBrokers))
    .map {
      tup => (tup._2.get("payload").get("itemId").asText, tup._2.get("payload").get("version").asLong, tup._2, tup._1)}
    .keyBy(0)
    .flatMap({
      new FilterPastVersions()
    })
      .print()


env.execute("My Program")

And here is the FilterPastVersions class:

class FilterPastVersions extends RichFlatMapFunction[(String, Long, ObjectNode, String), (String, ObjectNode)] {

  private var version: ValueState[Long] = _

  override def flatMap(input: (String, Long, ObjectNode, String), out: Collector[(String, ObjectNode)]): Unit = {

    // access the state value
    val tmpCurrentVersion = version.value()

    // If it hasn't been used before, it will be null
    if (tmpCurrentVersion == null || input._2 > tmpCurrentVersion){
      version.update(input._2)
      out.collect((input._4, input._3))
    }
  }

  override def open(parameters: Configuration): Unit = {
    val versionDesc = new ValueStateDescriptor[Long]("version", createTypeInformation[Long])
    versionDesc.setQueryable("version-state")

    version = getRuntimeContext.getState(versionDesc)
  }
}

If I put a breakpoint at every line in the main function, the execution does stop at each breakpoint. However, no data is actually processed until after env.execute, so those breakpoints don't show any execution.

If I put breakpoints in the flatmap function of FilterPastVersions, those breakpoints are never hit. The program does successfully print messages from Kafka.

Am I missing something here, or is this a limitation of Flink? I am using IntelliJ, and have tried this with a remote debugger, as well as just clicking the debug button for the application configuration.

Upvotes: 2

Views: 1307

Answers (1)

pradithya aria
pradithya aria

Reputation: 687

It's because flink program is executed lazily. Thus, your debugging session will only execute the declaration part of your flink's pipeline. The stream processing itself is done during execute() method.

Check the documentation. (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html)

All Flink programs are executed lazily: When the program’s main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to the program’s plan. The operations are actually executed when the execution is explicitly triggered by an execute() call on the execution environment. Whether the program is executed locally or on a cluster depends on the type of execution environment

The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.

Upvotes: 1

Related Questions