Reputation: 551
I have set up the Confluence Data Platform and started to develop a SourceConnector and in the corresponding SourceTask.poll() method I do the following (pseudo Java code below):
public List<SourceRecord> poll() throws InterruptedException {
....
Envelope envelope = new Envelope();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder enc = EncoderFactory.get().binaryEncoder(out, null);
DatumWriter<Envelope> dw = new ReflectDatumWriter<Envelope>(Envelope.class);
dw.write((Envelope)envelope, enc);
enc.flush();
out.close();
Map<String, String> sourcePartition = new HashMap<String, String>();
sourcePartition.put("stream", streamName);
Map<String, Integer> sourceOffset = new HashMap<String, Integer>();
sourceOffset.put("position", Integer.parseInt(envelope.getTimestamp()));
records.add(new SourceRecord(sourcePartition, sourceOffset, topic, org.apache.kafka.connect.data.Schema.BYTES_SCHEMA, envelope));
....
I'd like to use the Schema Registry so that the object being serialized is tagged with a schema id from the registry, serialized and then published to the Kafka-topic through the poll() function. If the schema for an arbitrary object doesn't reside in the registry I want it to be registered and the corresponding generated id returned to the serializer process so it becomes part of the serialized object making it deserializable.
What do I need to do in the code above to make this happen?
Upvotes: 2
Views: 10982
Reputation: 17122
Per the documentation:
In the POM:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.1-cp1</version>
<scope>provided</scope>
</dependency>
In the application, creating the producer:
Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
// Set any other properties
KafkaProducer producer = new KafkaProducer(props);
Using the producer:
User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
Future<RecordAndMetadata> resultFuture = producer.send(user1);
In your registry, for this example, you need a schema for "User".
Confluent also has a nice example in Github:
package io.confluent.examples.producer;
import JavaSessionize.avro.LogLine;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
public class AvroClicksProducer {
public static void main(String[] args) throws InterruptedException {
if (args.length != 1) {
System.out.println("Please provide command line arguments: schemaRegistryUrl");
System.exit(-1);
}
String schemaUrl = args[0];
Properties props = new Properties();
// hardcoding the Kafka server URI for this example
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);
// Hard coding topic too.
String topic = "clicks";
// Hard coding wait between events so demo experience will be uniformly nice
int wait = 500;
Producer<String, LogLine> producer = new KafkaProducer<String, LogLine>(props);
// We keep producing new events and waiting between them until someone ctrl-c
while (true) {
LogLine event = EventGenerator.getNext();
System.out.println("Generated event " + event.toString());
// Using IP as key, so events from same IP will go to same partition
ProducerRecord<String, LogLine> record = new ProducerRecord<String, LogLine>(topic, event.getIp().toString(), event);
producer.send(record);
Thread.sleep(wait);
}
}
}
Upvotes: 0
Reputation: 423
Checkout https://gist.github.com/avpatel257/0a88d20200661b31ab5f5df7adc42e6f for example implementation.
You will need following dependencies from confluent in order to make it work.
<dependency>
<groupId>io.confluent</groupId>
<artifactId>common-config</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>common-utils</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>3.0.0</version>
</dependency>
Upvotes: 2
Reputation: 1727
To use the SchemaRegistry you have to serialize/deserialize your data using the classes provided by Confluent :
Those classes contain all the logic to register and request the schemas from the Registry.
If you use maven you can add this dependency :
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>2.0.1</version>
</dependency>
Upvotes: 3