Reputation: 63
I am trying to learn Kafka and now Avro, To keep consistency between the sender object and receiver object, we keep a JSON schema (.avsc). but I am not able to find any simple example of how to use it. some example is using confluent (is confluent mandate for Avro), some are generating object via Avro tool. so far I have a working Kafka setup.
Object class
package com.example.kafka;
public class Hello {
String name;
String age;
public Hello(String name, String age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "Hello{" +
"name='" + name + '\'' +
", date='" + age + '\'' +
'}';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
}
Controler Class
package com.example.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/")
class KafkaController {
@Autowired
KafkaService kafkaService;
@GetMapping("test")
public Hello hello() {
Hello hello = new Hello("shrikant", "25");
kafkaService.hello(hello);
return hello;
}
}
Main Application
package com.example.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableAutoConfiguration
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
KafkaProducerConfig
package com.example.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Hello> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Hello> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaSerializer
package com.example.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class KafkaSerializer implements Serializer<Hello> {
@Override
public byte[] serialize(String arg0, Hello developer) {
byte[] serializedBytes = null;
ObjectMapper objectMapper = new ObjectMapper();
try {
serializedBytes = objectMapper.writeValueAsString(developer).getBytes();
} catch (Exception e) {
e.printStackTrace();
}
return serializedBytes;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void configure(Map<String, ?> arg0, boolean arg1) {
// TODO Auto-generated method stub
}
}
KafkaService
package com.example.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService {
@Autowired
private KafkaTemplate<String, Hello> kafkaTemplate;
public void hello(Hello hello) {
kafkaTemplate.send("test", hello);
}
}
Hello.avsc
{"namespace": "com.example.kafka",
"type": "record",
"name": "Hello",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "string"},
]
}
build.gradle
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath 'org.springframework.boot:spring-boot-gradle-plugin:1.5.21.RELEASE'
}
}
plugins {
id 'java'
}
apply plugin: 'org.springframework.boot'
group = 'com.example.kafka'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
mavenCentral()
}
ext {
set('spring-kafka.version', "2.2.8.RELEASE")
}
dependencies {
compile 'org.springframework.boot:spring-boot-starter'
compile 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-web'
}
it's working setup and I am able to send and receive data, what changes I need to make in order to get Avro working.
Upvotes: 1
Views: 7550
Reputation: 191743
Confluent maintains tutorials for exactly this use case: https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java-springboot.html
You're currently using only JSON, not "JSON schema". To use Avro easily with your current setup, you'd have to import the Jackson Avro data format Objectmapper
.
https://github.com/FasterXML/jackson-dataformats-binary/blob/master/avro/README.md
Alternatively (recommended) you can install/run the Confluent Schema Registry and use their serializers without having to write your own for every object class you want. Confluent provides KafkaAvroSerializer
class for this purpose so you don't need to implement your own.
To actually use the AVSC, you will need to read the file from the filesystem to create a Schema object, or can use the Avro Gradle Plugin to have it generate a object class for you rather than you writing one manually, which will have the schema embedded as a variable. https://github.com/commercehub-oss/gradle-avro-plugin
The Confluent examples use the Avro Maven plugin, but the idea would be similar.
Note that using the Jackson Avro encoded messages are not compatible with Confluent ones, as the Confluent serialized messages do not contain any Avro schema themselves, so you cannot mix those (de)serializers
Upvotes: 3