Kodo
Kodo

Reputation: 551

How to use the AVRO serializer with Schema Registry from a Kafka Connect SourceTask

I have set up the Confluence Data Platform and started to develop a SourceConnector and in the corresponding SourceTask.poll() method I do the following (pseudo Java code below):

    public List<SourceRecord> poll() throws InterruptedException {

....

    Envelope envelope = new Envelope();
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    Encoder enc = EncoderFactory.get().binaryEncoder(out, null);
    DatumWriter<Envelope> dw = new ReflectDatumWriter<Envelope>(Envelope.class);
    dw.write((Envelope)envelope, enc);
    enc.flush();
    out.close();
    Map<String, String> sourcePartition = new HashMap<String, String>();
    sourcePartition.put("stream", streamName);
    Map<String, Integer> sourceOffset = new HashMap<String, Integer>();
    sourceOffset.put("position", Integer.parseInt(envelope.getTimestamp()));
    records.add(new SourceRecord(sourcePartition, sourceOffset, topic, org.apache.kafka.connect.data.Schema.BYTES_SCHEMA, envelope));

....

I'd like to use the Schema Registry so that the object being serialized is tagged with a schema id from the registry, serialized and then published to the Kafka-topic through the poll() function. If the schema for an arbitrary object doesn't reside in the registry I want it to be registered and the corresponding generated id returned to the serializer process so it becomes part of the serialized object making it deserializable.

What do I need to do in the code above to make this happen?

Upvotes: 2

Views: 10982

Answers (3)

Pete
Pete

Reputation: 17122

Per the documentation:

In the POM:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>3.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.11.0.1-cp1</version>
    <scope>provided</scope>
</dependency>

In the application, creating the producer:

Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
// Set any other properties
KafkaProducer producer = new KafkaProducer(props);

Using the producer:

User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
Future<RecordAndMetadata> resultFuture = producer.send(user1);

In your registry, for this example, you need a schema for "User".

Confluent also has a nice example in Github:

package io.confluent.examples.producer;

import JavaSessionize.avro.LogLine;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;

public class AvroClicksProducer {

    public static void main(String[] args) throws InterruptedException {
        if (args.length != 1) {
            System.out.println("Please provide command line arguments: schemaRegistryUrl");
            System.exit(-1);
        }

        String schemaUrl = args[0];

        Properties props = new Properties();
        // hardcoding the Kafka server URI for this example
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url", schemaUrl);

        // Hard coding topic too.
        String topic = "clicks";

        // Hard coding wait between events so demo experience will be uniformly nice
        int wait = 500;

        Producer<String, LogLine> producer = new KafkaProducer<String, LogLine>(props);

        // We keep producing new events and waiting between them until someone ctrl-c
        while (true) {
            LogLine event = EventGenerator.getNext();
            System.out.println("Generated event " + event.toString());

            // Using IP as key, so events from same IP will go to same partition
            ProducerRecord<String, LogLine> record = new ProducerRecord<String, LogLine>(topic, event.getIp().toString(), event);
            producer.send(record);
            Thread.sleep(wait);
        }
    }
}

Upvotes: 0

apatel
apatel

Reputation: 423

Checkout https://gist.github.com/avpatel257/0a88d20200661b31ab5f5df7adc42e6f for example implementation.

You will need following dependencies from confluent in order to make it work.

    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>common-config</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>common-utils</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-schema-registry-client</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>3.0.0</version>
    </dependency>

Upvotes: 2

fhussonnois
fhussonnois

Reputation: 1727

To use the SchemaRegistry you have to serialize/deserialize your data using the classes provided by Confluent :

  • io.confluent.kafka.serializers.KafkaAvroSerializer
  • io.confluent.kafka.serializers.KafkaAvroDeserializer

Those classes contain all the logic to register and request the schemas from the Registry.

If you use maven you can add this dependency :

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>2.0.1</version>
</dependency>

Upvotes: 3

Related Questions