"SerializationException: Unknown magic byte" with a working Producer/Consumer

I've checked the similar questions and most of them had a problem with serialization. But in my case, I have a pair of Producer and Consumer code that works fine. I can send and retrieve data in java by serializating and deserializating.

When I try to save the data with the JDBC Sink Connector by using this config:

curl -XPOST --header "Content-Type: application/json"  XXXXXXXXXX.XXXXXXXXXX.com:8083/connectors  -d 
'{  
    "name": "sink_AVROTESTNEW",  
    "config": {    
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",    
        "tasks.max": 3,   
        "connection.url": "jdbc:oracle:thin:@XXXXXXXXXX:1854:XXXXXXXXXX",
        "connection.user": "XXXXXXXXXX",    
        "connection.password": "XXXXXXXXXX",    
        "table.name.format": "AVROTEST",
        "topics": "AVROTESTNEW",
        "auto.create": "false"
    }
}

It gives the Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte! error. My producer and consumer codes are straight up taken from confluent website.

Producer:

package com.test;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;


public class Producer {
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, " XXXXXXXXXX:9072");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                org.apache.kafka.common.serialization.StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        props.put("schema.registry.url", "http:// XXXXXXXXXX.XXXXXXXX.XXXXXX:8071/");
        KafkaProducer producer = new KafkaProducer(props);

        String key = "key2";
        String userSchema = "{\n" +
                "  \"type\": \"record\",\n" +
                "  \"name\": \"CUSTOMER\",\n" +
               "  \"namespace\": \"XXXXXXXXX\",\n" +
                "  \"fields\": [\n" +
                "    {\n" +
                "      \"name\": \"FIRST_NAME\",\n" +
                "      \"type\": \"string\"\n" +
                "    },\n" +
                "    {\n" +
                "      \"name\": \"LAST_NAME\",\n" +
                "      \"type\": [\"null\",\"string\"]\n" +
                "    },\n" +
                "    {\n" +
                "      \"name\": \"HEIGHT\",\n" +
                "      \"type\": [\"null\",\"string\"]\n" +
                "    },\n" +
                "    {\n" +
                "      \"name\": \"WEIGHT\",\n" +
                "      \"type\": [\"null\",\"string\"]\n" +
                "    },\n" +
                "    {\n" +
                "      \"name\": \"AUTOMATED_EMAIL\",\n" +
                "      \"type\": [\"null\",\"string\"]\n" +
                "    }\n" +
                "  ]\n" +
                "}\n";

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(userSchema);
        GenericRecord avroRecord = new GenericData.Record(schema);
        avroRecord.put("FIRST_NAME", "X");
        avroRecord.put("LAST_NAME", "X");
        avroRecord.put("HEIGHT", "X");
        avroRecord.put("WEIGHT", "X");
        avroRecord.put("AUTOMATED_EMAIL", "X");


        ProducerRecord<Object, Object> record = new ProducerRecord<>("AVROTESTNEW", key, avroRecord);
        try {
            producer.send(record);
        } catch(SerializationException e) {
        }
        finally {
            producer.flush();
            producer.close();
        }
    }
}

Consumer:

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.consumer.*;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Arrays;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) {


        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, " XXXXXXXXXX:9072");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");


        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put("schema.registry.url", "http:// XXXXXXXXXX.XXXXXXX.XXXXX:8071/");

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        String topic = "AVROTESTNEW";
        final org.apache.kafka.clients.consumer.Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
        consumer.subscribe(Arrays.asList(topic));

        try {
            while (true) {
                ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
                for (ConsumerRecord<String, GenericRecord> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

Upvotes: 1

Views: 3329

Answers (1)

Uhh it was a stupid mistake. Solved it by changing the curl command as:

curl -XPOST --header "Content-Type: application/json"  XXXXXXXXXXXXXXXXX:8083/connectors  -d 
'{  
    "name": "sink_AVROTESTBUNNEW",  
    "config": {    
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",    
        "tasks.max": 3,   
        "connection.url": "jdbc:oracle:thin:@XXXXXXXX",
        "connection.user": "XXXXXXXXXXX",    
        "connection.password": "XXXXXXXXXX",    
        "table.name.format": "AVROTEST",
        "topics": "AVROTESTBUN",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://XXXXXXXXXXXXXXXXXX:8071",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable":"false",
        "value.converter.schemas.enable":"false",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "auto.create": true,
        "auto.evolve": true        
    }
}' 

Defining the key and value converters solved it.

Upvotes: 1

Related Questions