Prasad
Prasad

Reputation: 103

Failing Apache Beam Pipeline when consuming events through KafkaIO on Flink runner

I have a beam pipeline with several stages that consumes data through a KafkaIO and the code looks like below,

pipeline.apply("Read Data from Stream", StreamReader.read())
        .apply("Decode event and extract relevant fields", ParDo.of(new DecodeExtractFields()))
        .apply(...);

StreamReader.read() method implementation,

public static KafkaIO.Read<String, String> read() {
    return KafkaIO.<String, String>read()
            .withBootstrapServers(Constants.BOOTSTRAP_SERVER)
            .withTopics(Constants.KAFKA_TOPICS)
            .withConsumerConfigUpdates(Constants.CONSUMER_PROPERTIES)
            .withKeyDeserializer(StringDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
  //Line-A  .withMaxReadTime(Duration.standardDays(10))
            .withLogAppendTime();
}

When running the pipeline on the Direct Runner, it runs without throwing any errors. But in my case, I have to use the Flink Runner and when the pipeline runs on the Flink Runner, it throws the following error,

Exception in thread "main" java.lang.RuntimeException: Error while translating UnboundedSource: org.apache.beam.sdk.io.kafka.KafkaUnboundedSource@14b31e37
    at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:250)
    at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators$ReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:336)
    at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:161)
....
    at Main.main(Main.java:6)
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @2c34f934
    at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
    at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
    at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
    at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:106)
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @2c34f934

The error can be resolved by un-commenting the Line-A in the above StreamReader.read() method, but that method, withMaxReadTime(...) should not use other than for testing/demos as per the doc.

The pipeline instantiation done like this,

PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
pipelineOptions.setRunner(FlinkRunner.class);
Pipeline pLine = Pipeline.create(pipelineOptions);

Questions:

  1. Why this error occurs?
  2. How do i solve this problem?

If possible, please provide some resources on this.

Upvotes: 0

Views: 383

Answers (1)

Kenn Knowles
Kenn Knowles

Reputation: 6023

The error appears to not be in Beam but in Flink's closure cleaner, which modifies private parts of user or SDK code. This appears to be a known issue with recent version of Java and Flink. See Error message on example flink job: Unable to make field private final byte[] java.lang.String.value accessible

Why does the commented line change things? Normally when reading from Kafka you read the stream in an unbounded read. When you specify withMaxReadTime this becomes a bounded read. So the translation to the underlying Flink operators is different.

Upvotes: 1

Related Questions