radumanolescu
radumanolescu

Reputation: 4161

Flink get RuntimeContext

In my Flink code I am using a custom input format, which throws an exception. It seems I need an instance of RuntimeContext, but how can I get one?

My format class looks like this:

MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T]{
@transient var lineCounter: IntCounter = _
override def open(split: FileInputSplit): Unit = {
    super.open(split)
    lineCounter = new IntCounter()
    getRuntimeContext.addAccumulator("rowsInFile", lineCounter) // this line throws IllegalStateException

My main program looks like this:

val env = ExecutionEnvironment.getExecutionEnvironment
val format = new MyInputFormat
env.readFile(format, inputFile.getAbsolutePath) // throws exception

The exception that gets thrown:

java.lang.IllegalStateException: The runtime context has not been initialized yet. Try accessing it in one of the other life cycle methods.
    at org.apache.flink.api.common.io.RichInputFormat.getRuntimeContext(RichInputFormat.java:51)

My class needs a RuntimeContext because it extends DelimitedInputFormat which extends... RichInputFormat

public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputSplit>
public abstract class RichInputFormat<OT, T extends InputSplit> implements InputFormat<OT, T>
    private transient RuntimeContext runtimeContext;
    public void setRuntimeContext(RuntimeContext t)
    public RuntimeContext getRuntimeContext()

So any instance of RichInputFormat expects us to setRuntimeContext(RuntimeContext t) after it's created.

I expect I should be doing the following:

val env = ExecutionEnvironment.getExecutionEnvironment
val runtimeContext: RuntimeContext = ??? // How do I get this?
val format = new MyInputFormat
format.setRuntimeContext(runtimeContext)
env.readFile(format, inputFile.getAbsolutePath) // no longer throws exception

But how do I get an instance of RuntimeContext? The exception gets thrown because my custom input format does not have a RuntimeContext. I would set one, but I don't know where to get it.

Upvotes: 2

Views: 5208

Answers (3)

Scott
Scott

Reputation: 103

I ran into this same issue in Flink. It looks like setRuntimeContext is called automatically underneath the hood by Flink, and not during the open call, and I could not find any obvious documentation explaining this. But you can do something like

lazy val acc = getRuntimeContext.addAccumulator(accName, acc)

in your class definition, and then call

acc.add(v)

at some other point in your code where this guaranteed to be initialized, eg in one of the overriden methods of the Flink class.

Upvotes: 1

TobiSH
TobiSH

Reputation: 2921

You should init the RuntimeContext in the lifecycle methods like open

MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T] {

override def  openInputFormat() = {
    getRuntimeContext.addAccumulator("rowsInFile", lineCounter)
}

Upvotes: 1

radumanolescu
radumanolescu

Reputation: 4161

I don't yet understand why, but it seems that MyInputFormat is being instantiated several times, including before the RuntimeContext is available. However, despite all this, the job works and computes what it needs to do. I have worked around this problem by enclosing all calls to addAccumulator(,) in a try, like so:

  private def addAccumulator(accName: String, acc: SimpleAccumulator[_]): Unit = {
    try {
      val rc = getRuntimeContext.getAccumulator(accName) // throws if RuntimeContext not yet set
      if (rc == null) getRuntimeContext.addAccumulator(accName, acc)
    } catch {
      case NonFatal(_) =>
    }
  }

I need to do this despite the fact that I am calling addAccumulator(,) inside open(), which seems like the right lifecycle method. Also: because of parallelism, several sub-jobs were trying to add the same accumulator, which is wrong. This is why I am attempting to get the accumulator first. If no context yet, no problem: I'll get one later. If accumulator already exists, no problem- nothing to do. This is just a workaround, not a solution - but that's what i have for now.

Upvotes: 0

Related Questions