Reputation: 513
I am seeing a strange behavior in my flink stream job. This is my code
streamExecutionEnvironment.enableCheckpointing(checkPointInterval, CheckpointingMode.EXACTLY_ONCE);
streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
ExecutionConfig executionConfig = streamExecutionEnvironment.getConfig();
executionConfig.disableForceKryo();
executionConfig.enableForceAvro();
Path path = new Path(outputPath);
CheckpointConfig config = streamExecutionEnvironment.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
String mutateConfig = IOUtils.toString(EventProcessor.class.getClassLoader().getResourceAsStream(configFile));
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("booking_flow_source");
DataStream<GenericRecord> enrichDataStream = dataStream.map(new MapFunction<GenericRecord, GenericRecord>() {
private transient Mutator mutator;
@Override
public GenericRecord map(GenericRecord record) {
GenericRecord mutateRecord=record;
try {
mutator = new Mutator(mutateConfig);
mutateRecord = mutator.mutate(record);
} catch (Exception e) {
e.printStackTrace();
}
return mutateRecord;
}
});
enrichDataStream.print();
This code is working fine till now. Now i have requirement to generate java class from my avro schema so i have included this avro dependency.
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.1</version>
</dependency>
After including this in my pom , my code stop working and i am getting Exception:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
props (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
Even i disable kryo and force avro in my code, still i am getting same Exception. if i remove this dependency than the code is working and my stream is getting printed.
So i am unable to understand by adding avro dependency what is getting changed.
Please help
Upvotes: 4
Views: 1375
Reputation: 1
the version of avro seems too high, you can try the avro 1.8.2 instead of 1.9.1
Upvotes: 0
Reputation: 11
I had a similar issue. I fixed it setting classloader.resolve-order: parent-first in flinkconfiguration.
Upvotes: 1