Aditya Tiwari
Aditya Tiwari

Reputation: 11

Unable to use KafkaIO with Flink Runner

I am trying to use KafkaIO read with Flink Runner for Beam version 2.45.0 I am seeing the following issues with the same:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No translator known for org.apache.beam.runners.core.construction.SplittableParDo$PrimitiveUnboundedRead
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:841)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1085)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1163)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1163)
Caused by: java.lang.IllegalStateException: No translator known for org.apache.beam.runners.core.construction.SplittableParDo$PrimitiveUnboundedRead
    at org.apache.beam.runners.core.construction.PTransformTranslation.urnForTransform(PTransformTranslation.java:283)
    at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:135)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
    at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
    at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:92)
    at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115)
    at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
    at BeamPipelineKafka.main(BeamPipelineKafka.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    ... 8 more

Is there anything I am missing? Or maybe some config to not use this SpilttableParDo?

Reading in following manner:

PipelineOptions options = PipelineOptionsFactory.create();

options.setRunner(FlinkRunner.class);

Pipeline pipeline = Pipeline.create(options);

pipeline
        // Read from the input Kafka topic
        .apply("Read from Kafka", KafkaIO.<String, String>read()
                .withBootstrapServers("localhost:9092")
                .withTopic("input-topic")
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class))
                .apply ....

Upvotes: 1

Views: 158

Answers (1)

Joevanie
Joevanie

Reputation: 605

The error message indicates that the Flink runner does not have a translator for the transform which makes Beam unable to translate the transform into a form that Flink can execute.A possible solution is to use a different runner that has a translator for the transform like Dataflow

Upvotes: 0

Related Questions