Fernie
Fernie

Reputation: 33

Context in processFunction for KeyedProcessFunction is null

I am trying to use KeyedProcessFunction, but the ctx: Context variable in processFunction inside my KeyedProcessFunction is returning null. Note that I'm using the default TimeCharacteristic which is ProcessingTime (so I'm not even setting it).

I found this on stackoverflow but that one is relating to EventTime and not ProcessingTime.

Following the exact example of https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#example, I have created the following using Scala 2.11.12 and Flink 1.10, and I'm still getting the same error.

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment


object example {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // the source data stream
    val stream = env.socketTextStream("localhost", 9999).map(x => {
      var splitCsv = x.stripLineEnd.split(",")
      (splitCsv(0), splitCsv(1))
    }
    )

    // apply the process function onto a keyed stream
    val result: DataStream[Tuple2[String, Long]] = stream
      .keyBy(0)
      .process(new CountWithTimeoutFunction())

    result.print()

    env.execute("Flink Streaming Demo STDOUT")

  }

  /**
   * The data type stored in the state
   */
  case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

  /**
   * The implementation of the ProcessFunction that maintains the count and timeouts
   */
  class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] {

    /** The state that is maintained by this process function */
    lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
      .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))


    override def processElement(
                                 value: (String, String),
                                 ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context,
                                 out: Collector[(String, Long)]): Unit = {

      // initialize or retrieve/update the state
      val current: CountWithTimestamp = state.value match {
        case null =>
          CountWithTimestamp(value._1, 1, ctx.timestamp)
        case CountWithTimestamp(key, count, lastModified) =>
          CountWithTimestamp(key, count + 1, ctx.timestamp)
      }

      // write the state back
      state.update(current)

      // schedule the next timer 60 seconds from the current event time
      ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
    }

    override def onTimer(
                          timestamp: Long,
                          ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#OnTimerContext,
                          out: Collector[(String, Long)]): Unit = {

      state.value match {
        case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
          out.collect((key, count))
        case _ =>
      }
    }
  }
}

Here is the error:

Caused by: java.lang.NullPointerException at scala.Predef$.Long2long(Predef.scala:363) at com.leidos.example$CountWithTimeoutFunction.processElement(example.scala:57) at com.leidos.example$CountWithTimeoutFunction.processElement(example.scala:42) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748)

Any idea of what am I doing wrong? Thank you in advance!

Upvotes: 1

Views: 1552

Answers (1)

Till Rohrmann
Till Rohrmann

Reputation: 13346

The problem is that you are accessing in line 57 the timestamp field of the Context. This field is null if you are using ProcessingTime or if you don't specify a timestamp extractor when using EventTime.

Upvotes: 1

Related Questions