Alfred
Alfred

Reputation: 21406

How to produce a Kafka avro record exactly as like produced using avro console producer?

I am using Confluent 3.3.0. My intention is to use kafka-connect to insert values from Kafka topic into Oracle table. My connector works fine with the avro record I have produced using avro console producer like below:

./kafka-avro-console-producer --broker-list 192.168.0.1:9092 --topic topic6 --property value.schema='{"type":"record","name":"flights3","fields":[{"name":"flight_id","type":"string"},{"name":"flight_to", "type": "string"}, {"name":"flight_from", "type": "string"}]}'

and I insert values like:

{"flight_id":"1","flight_to":"QWE","flight_from":"RTY"}

What I am trying to achieve is to insert the same data using a Java application, using objects. Below is my producer code:

public class Sender {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.1:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "serializers.custom.FlightSerializer");
        props.put("schema.registry.url", "http://192.168.0.1:8081");
        Producer<String, Flight> producer = new KafkaProducer<String, Flight>(props);
        Flight myflight = new Flight("testflight1","QWE","RTY");
        ProducerRecord<String, Flight> record = new ProducerRecord<String, Flight>("topic5","key",myflight);

        try {
            producer.send(record).get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Below is the Flight VO:

package vo;

public class Flight {
    String flight_id,flight_to,flight_from;

    public Flight(String flight_id, String flight_to, String flight_from) {
        this.flight_id = flight_id;
        this.flight_to = flight_to;
        this.flight_from = flight_from;
    }

    public Flight(){
    }

    public String getFlight_id() {
        return flight_id;
    }

    public void setFlight_id(String flight_id) {
        this.flight_id = flight_id;
    }

    public String getFlight_to() {
        return flight_to;
    }

    public void setFlight_to(String flight_to) {
        this.flight_to = flight_to;
    }

    public String getFlight_from() {
        return flight_from;
    }

    public void setFlight_from(String flight_from) {
        this.flight_from = flight_from;
    }
}

and finally, the Serializer:

package serializers.custom;

import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import vo.Flight;
import com.fasterxml.jackson.databind.ObjectMapper;

public class FlightSerializer implements Serializer<Flight> {
    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> arg0, boolean arg1) {
    }

    @Override
    public byte[] serialize(String arg0, Flight arg1) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();

        try {
            retVal = objectMapper.writeValueAsString(arg1).getBytes();
        } catch (Exception e) {
            e.printStackTrace();
        }

        return retVal;
    }
}

But what I understood is that there is something like schema need to be defined, and use some avro serializer to get the exact data like I did using avro console consumer. I have gone through some example codes, but none worked for me.

Edit

I tried the following code. But nothing is coming in avro console consumer.

package producer.serialized.avro;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import vo.Flight;
import java.util.Properties;

public class Sender {
public static void main(String[] args) {
String flightSchema = "{\"type\":\"record\"," + "\"name\":\"flights\","
+ "\"fields\":[{\"name\":\"flight_id\",\"type\":\"string\"},{\"name\":\"flight_to\",\"type\":\"string\"},{\"name\":\"flight_from\",\"type\":\"string\"}]}";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://192.168.0.1:8081");
KafkaProducer producer = new KafkaProducer(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(flightSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("flight_id", "1");
avroRecord.put("flight_to", "QWE");
avroRecord.put("flight_from", "RTY");
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("topic6", avroRecord);

try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}
}

Upvotes: 4

Views: 9057

Answers (2)

OneCricketeer
OneCricketeer

Reputation: 191854

exact data like I did using avro console consumer

You can take a peek at the source code for that


Assuming you want to use Generic Records, this all is correct,

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092");
...
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://192.168.0.1:8081");

Producer<String, GenericRecord> producer = new KafkaProducer<>(props);

...

GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("flight_id", "1");
avroRecord.put("flight_to", "QWE");
avroRecord.put("flight_from", "RTY");
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("topic6", avroRecord);

try {
    producer.send(record);
} catch (Exception e) {
    e.printStackTrace();
}

But you are missing a call to producer.flush() and producer.close() at the end to actually send the batch of records

Upvotes: 0

Sa&#239;d Bouras
Sa&#239;d Bouras

Reputation: 296

The schema is not defined so when KafkaAvroSerializer will have to contact the schema-registry to submit the schema it will not have it.

You have to create a schema for your object Flight

An example of file.avdl (one of avro extension file) below will be fine :

@namespace("vo")
protocol FlightSender {

    record Flight {
       union{null, string} flight_id = null;
       union{null, string} flight_to = null;
       union{null, string} flight_from = null;
    }
}

See Avro IDL docs

At compile time, when you use the avro-maven-plugin, the avro schema above will generate your java Flight class and thus you have to delete the one that you created previously.

When it concern your main class, you have to set the two properties like below:

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class); 

And your producer, you can use your generated, specific Avro class

Producer<String, Flight> producer = new KafkaProducer<String, Flight>(props);

Hope that's helping :-)

Upvotes: 1

Related Questions