Anuj jain
Anuj jain

Reputation: 513

Flink throwing com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException

I am seeing a strange behavior in my flink stream job. This is my code

        streamExecutionEnvironment.enableCheckpointing(checkPointInterval, CheckpointingMode.EXACTLY_ONCE);
        streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        ExecutionConfig executionConfig = streamExecutionEnvironment.getConfig();
        executionConfig.disableForceKryo();
        executionConfig.enableForceAvro();
        Path path = new Path(outputPath);
        CheckpointConfig config = streamExecutionEnvironment.getCheckpointConfig();
        config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        String mutateConfig = IOUtils.toString(EventProcessor.class.getClassLoader().getResourceAsStream(configFile));

        FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
                new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
                properties);

flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
        DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("booking_flow_source");


        DataStream<GenericRecord> enrichDataStream = dataStream.map(new MapFunction<GenericRecord, GenericRecord>() {
            private transient Mutator mutator;
            @Override
            public GenericRecord map(GenericRecord record)  {
                GenericRecord mutateRecord=record;
                try {
                    mutator = new Mutator(mutateConfig);
                    mutateRecord = mutator.mutate(record);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return mutateRecord;
            }
        });

        enrichDataStream.print();

This code is working fine till now. Now i have requirement to generate java class from my avro schema so i have included this avro dependency.

<dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>1.9.1</version>
</dependency>

After including this in my pom , my code stop working and i am getting Exception:

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
props (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)

Even i disable kryo and force avro in my code, still i am getting same Exception. if i remove this dependency than the code is working and my stream is getting printed.

So i am unable to understand by adding avro dependency what is getting changed.

Please help

Upvotes: 4

Views: 1375

Answers (2)

zhangfangkai
zhangfangkai

Reputation: 1

the version of avro seems too high, you can try the avro 1.8.2 instead of 1.9.1

Upvotes: 0

Sergio Mart&#237;nez
Sergio Mart&#237;nez

Reputation: 11

I had a similar issue. I fixed it setting classloader.resolve-order: parent-first in flinkconfiguration.

Upvotes: 1

Related Questions