Reputation: 4161
In my Flink code I am using a custom input format, which throws an exception. It seems I need an instance of RuntimeContext
, but how can I get one?
My format class looks like this:
MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T]{
@transient var lineCounter: IntCounter = _
override def open(split: FileInputSplit): Unit = {
super.open(split)
lineCounter = new IntCounter()
getRuntimeContext.addAccumulator("rowsInFile", lineCounter) // this line throws IllegalStateException
My main program looks like this:
val env = ExecutionEnvironment.getExecutionEnvironment
val format = new MyInputFormat
env.readFile(format, inputFile.getAbsolutePath) // throws exception
The exception that gets thrown:
java.lang.IllegalStateException: The runtime context has not been initialized yet. Try accessing it in one of the other life cycle methods.
at org.apache.flink.api.common.io.RichInputFormat.getRuntimeContext(RichInputFormat.java:51)
My class needs a RuntimeContext
because it extends DelimitedInputFormat
which extends... RichInputFormat
public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT>
public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputSplit>
public abstract class RichInputFormat<OT, T extends InputSplit> implements InputFormat<OT, T>
private transient RuntimeContext runtimeContext;
public void setRuntimeContext(RuntimeContext t)
public RuntimeContext getRuntimeContext()
So any instance of RichInputFormat
expects us to setRuntimeContext(RuntimeContext t)
after it's created.
I expect I should be doing the following:
val env = ExecutionEnvironment.getExecutionEnvironment
val runtimeContext: RuntimeContext = ??? // How do I get this?
val format = new MyInputFormat
format.setRuntimeContext(runtimeContext)
env.readFile(format, inputFile.getAbsolutePath) // no longer throws exception
But how do I get an instance of RuntimeContext?
The exception gets thrown because my custom input format does not have a RuntimeContext
. I would set one, but I don't know where to get it.
Upvotes: 2
Views: 5208
Reputation: 103
I ran into this same issue in Flink. It looks like setRuntimeContext is called automatically underneath the hood by Flink, and not during the open
call, and I could not find any obvious documentation explaining this. But you can do something like
lazy val acc = getRuntimeContext.addAccumulator(accName, acc)
in your class definition, and then call
acc.add(v)
at some other point in your code where this guaranteed to be initialized, eg in one of the overriden methods of the Flink class.
Upvotes: 1
Reputation: 2921
You should init the RuntimeContext in the lifecycle methods like open
MyInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat[T] {
override def openInputFormat() = {
getRuntimeContext.addAccumulator("rowsInFile", lineCounter)
}
Upvotes: 1
Reputation: 4161
I don't yet understand why, but it seems that MyInputFormat
is being instantiated several times, including before the RuntimeContext
is available. However, despite all this, the job works and computes what it needs to do. I have worked around this problem by enclosing all calls to addAccumulator(,)
in a try
, like so:
private def addAccumulator(accName: String, acc: SimpleAccumulator[_]): Unit = {
try {
val rc = getRuntimeContext.getAccumulator(accName) // throws if RuntimeContext not yet set
if (rc == null) getRuntimeContext.addAccumulator(accName, acc)
} catch {
case NonFatal(_) =>
}
}
I need to do this despite the fact that I am calling addAccumulator(,)
inside open()
, which seems like the right lifecycle method. Also: because of parallelism, several sub-jobs were trying to add the same accumulator, which is wrong. This is why I am attempting to get the accumulator first. If no context yet, no problem: I'll get one later. If accumulator already exists, no problem- nothing to do.
This is just a workaround, not a solution - but that's what i have for now.
Upvotes: 0