Reputation: 810
I've checked the similar questions and most of them had a problem with serialization. But in my case, I have a pair of Producer and Consumer code that works fine. I can send and retrieve data in java by serializating and deserializating.
When I try to save the data with the JDBC Sink Connector by using this config:
curl -XPOST --header "Content-Type: application/json" XXXXXXXXXX.XXXXXXXXXX.com:8083/connectors -d
'{
"name": "sink_AVROTESTNEW",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 3,
"connection.url": "jdbc:oracle:thin:@XXXXXXXXXX:1854:XXXXXXXXXX",
"connection.user": "XXXXXXXXXX",
"connection.password": "XXXXXXXXXX",
"table.name.format": "AVROTEST",
"topics": "AVROTESTNEW",
"auto.create": "false"
}
}
It gives the Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! error. My producer and consumer codes are straight up taken from confluent website.
Producer:
package com.test;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, " XXXXXXXXXX:9072");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http:// XXXXXXXXXX.XXXXXXXX.XXXXXX:8071/");
KafkaProducer producer = new KafkaProducer(props);
String key = "key2";
String userSchema = "{\n" +
" \"type\": \"record\",\n" +
" \"name\": \"CUSTOMER\",\n" +
" \"namespace\": \"XXXXXXXXX\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"FIRST_NAME\",\n" +
" \"type\": \"string\"\n" +
" },\n" +
" {\n" +
" \"name\": \"LAST_NAME\",\n" +
" \"type\": [\"null\",\"string\"]\n" +
" },\n" +
" {\n" +
" \"name\": \"HEIGHT\",\n" +
" \"type\": [\"null\",\"string\"]\n" +
" },\n" +
" {\n" +
" \"name\": \"WEIGHT\",\n" +
" \"type\": [\"null\",\"string\"]\n" +
" },\n" +
" {\n" +
" \"name\": \"AUTOMATED_EMAIL\",\n" +
" \"type\": [\"null\",\"string\"]\n" +
" }\n" +
" ]\n" +
"}\n";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("FIRST_NAME", "X");
avroRecord.put("LAST_NAME", "X");
avroRecord.put("HEIGHT", "X");
avroRecord.put("WEIGHT", "X");
avroRecord.put("AUTOMATED_EMAIL", "X");
ProducerRecord<Object, Object> record = new ProducerRecord<>("AVROTESTNEW", key, avroRecord);
try {
producer.send(record);
} catch(SerializationException e) {
}
finally {
producer.flush();
producer.close();
}
}
}
Consumer:
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.consumer.*;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Arrays;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " XXXXXXXXXX:9072");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put("schema.registry.url", "http:// XXXXXXXXXX.XXXXXXX.XXXXX:8071/");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
String topic = "AVROTESTNEW";
final org.apache.kafka.clients.consumer.Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
Upvotes: 1
Views: 3329
Reputation: 810
Uhh it was a stupid mistake. Solved it by changing the curl command as:
curl -XPOST --header "Content-Type: application/json" XXXXXXXXXXXXXXXXX:8083/connectors -d
'{
"name": "sink_AVROTESTBUNNEW",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 3,
"connection.url": "jdbc:oracle:thin:@XXXXXXXX",
"connection.user": "XXXXXXXXXXX",
"connection.password": "XXXXXXXXXX",
"table.name.format": "AVROTEST",
"topics": "AVROTESTBUN",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://XXXXXXXXXXXXXXXXXX:8071",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"auto.create": true,
"auto.evolve": true
}
}'
Defining the key and value converters solved it.
Upvotes: 1