Reputation: 709
I'm trying to send ProducerRecord
s of my custom type to Kafka, but I'm getting the error:
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
I set up schema in Schema: GET
http://localhost:8081/subjects/documentCreations-key/versions/3
Response:
{
"subject": "documentCreations-key",
"version": 3,
"id": 1,
"schema": "\"string\""}
GET
http://localhost:8081/subjects/documentCreations-value/versions/4
Response
{
"subject": "documentCreations-value",
"version": 4,
"id": 23,
"schema": "{\"type\":\"record\",\"name\":\"Document\",\"namespace\":\"com.bade\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"path\",\"type\":\"string\"}]}"
}
Here is my Scala class:
class Document(val name: java.lang.String,
val title: java.lang.String,
val path: java.lang.String)
And the part with KafkaProducer:
class MyKafkaProducer {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer")
props.put("schema.registry.url", "http://localhost:8081")
private val producer = new KafkaProducer[java.lang.String, Document](props)
def sendCreateDocumentMessage(document: Document): RecordMetadata = {
val documentRecord = new ProducerRecord[java.lang.String, Document](SharedConfig
.documentCreationsTopic,
document.name, document)
producer.send(documentRecord).get()
}
What am I missing? I see that I can implement SpecificRecord for my class, but I didn't see that as necessary in book/tutorials that I've been reading. Thanks!
EDITED: Fixed class name
Upvotes: 1
Views: 9508
Reputation: 709
Answering my own question. Apparently, (de)serialization is not done automatically (via reflection or something), but you have to generate the class from avro schema file. Posting my pom.xml
if it will be helpful to someone:
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!--force java 8-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.1</version>
</plugin>
<plugin>
<!-- Build an executable JAR -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<archive>
<manifest>
<mainClass>Main</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>src/main/avro
</sourceDirectory>
</configuration>
</execution>
</executions>
</plugin>
<!--force discovery of generated classes-->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>target/generated-sources/avro</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<properties>
<kafka.version>1.0.0</kafka.version>
<confluent.version>4.0.0</confluent.version>
<avro.version>1.8.2</avro.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
</dependency>
</dependencies>
I build it with following mvn commands:
mvn clean:clean avro:schema compiler:compile scala:compile jar:jar
Upvotes: 1