Reputation: 1570
I want to produce some generic data into kafka topic using apache nifi and i want this data to be in avro format. What i've done for it:
{ "type": "record", "name": "my_schema", "namespace": "my_namespace", "doc": "", "fields": [ { "name": "key", "type": "int" }, { "name": "value", "type": [ "null", "int" ] }, { "name": "event_time", "type": "long" } ] }
Then i try to read it using kafka streams:
public class Test { private final static Logger logger = Logger.getLogger(KafkaFilterUsingCacheAvro.class);
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "registry:8081");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, GenericRecord> source = builder.stream("topic");
source.foreach((k, v) -> logger.info(String.format("[%s]: %s", k, v.toString())));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, properties);
streams.start();
}
}
GenericAvroSerde - https://github.com/JohnReedLOL/kafka-streams/blob/master/src/main/java/io/confluent/examples/streams/utils/GenericAvroSerde.java
And in result i get errors:
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
I also tried to explicitly set avro schema in avroreader\writer but it didn't help. Also, if i try to simply read bytes from topic and convert it into a string representation, i get something like this:
Objavro.schema{"type":"record","name":"my_schema","namespace":"my_namespace","doc":"","fields":[{"name":"key","type":"int"},{"name":"value","type":["null","int"]},{"name":"event_time","type":"long"}]}avro.codecsnappyÛ4ým[©q ÃàG0 ê¸ä»/}½{Û4ým[©q ÃàG0
How can i fix it?
Upvotes: 1
Views: 3682
Reputation: 18630
In the PublishKafka processor your Avro writer is configured with "Schema Write Strategy" of "Embedded Avro Schema". This means the messages being written to Kafka are standard Avro messages with the full schema embedded.
On your consumer side (Kafka streams) it looks like it is expecting to use the confluent schema registry, in which case it is not expecting an embedded Avro schema, it is expecting a special sequence of bytes specifying a schema id, followed by the bare Avro message.
Assuming you want to keep your consumer as is, then on the NiFi side you will want to change your Avro writer's "Schema Write Strategy" to "Confluent Schema Registry Reference". I think this might also require you to change the Avro reader to access the schema using a Confluent Schema Registry service.
Alternatively, maybe there is a way to make Kafka Streams read an embedded schema and not use the Confluent schema registry, but I have not used Kafka Streams before so I can't say if that is possible.
Upvotes: 6