BountyHunter
BountyHunter

Reputation: 121

Kafka Avro Serializer and deserializer exception. Avro supported types

I am seeing following error

exception Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord

my kafka producer props are

Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    props.put("schema.registry.url", "http://localhost:8081");
    props.put("value.converter.schema.registry.url", "http://localhost:8081");
    props.put("producer.type", "sync");
    props.put(ProducerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);

    Producer<String, TweetInfoDto> producer = new KafkaProducer(props);

and my kafka consumer props are

Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "twitterCrawler");
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, KafkaConstants.CLIENT_ID);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put("schema.registry.url", "http://localhost:8081");
    props.put("value.converter.schema.registry.url", "http://localhost:8081");

    Consumer<String, TweetInfoDto> consumer = new KafkaConsumer(props);

not sure what am I doing wrong.

Upvotes: 1

Views: 2314

Answers (2)

OneCricketeer
OneCricketeer

Reputation: 191671

TweetInfoDto cannot be a plain Java object that you have defined yourself.

It ideally should be created from an Avro schema via the Avro Maven Plugin, for example.

Please refer to the Schema Registry Tutorial for all the steps including defining an AVSC, and generating a Java class for it.

Tutorial sample code here

Upvotes: 1

user10486861
user10486861

Reputation:

Adding on to what cricket_007 has mentioned, could consider using avro tools - Serializing and deserializing with code generation

Upvotes: 0

Related Questions