Reputation: 481
How can I generate classes from avro json spec and use the generated classes to decode a message from kafka client?
I've found some partial example using data from file, but nothing working using kafka as source (without schema registry).
Upvotes: 0
Views: 1372
Reputation: 481
thank you ChristDist!
I post the followe code as compact example to how use the genaration classes:
public class Main {
public static String convertTime(long time) {
Date date = new Date(time);
Format format = new SimpleDateFormat("yyyy MM dd HH:mm:ss");
return format.format(date);
}
public static void main(String[] args) {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"your broker");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"your-username\" password=\"your-password\";");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-avro");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
String topic = "messages-payments";
final Consumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props);
consumer.subscribe(Arrays.asList(topic));
try {
for (int i = 0; i < 1000; i++) {
ConsumerRecords<String, byte[]> records = consumer.poll(10000);
System.out.println("************************************************************************");
System.out.println("!!!!!!!records length: " + records.count());
for (ConsumerRecord<String, byte[]> record : records) {
System.out.printf("offset = %d, date = %s, key = %s, value = %s \n", record.offset(),
convertTime(record.timestamp()),
record.key(),
new String(record.value(), StandardCharsets.UTF_8));
System.out.println("HEX:" + new String(Hex.encode(record.value())));
DatumReader<message> reader = new SpecificDatumReader<>(message.class);
try {
Decoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
message m = reader.read(null, decoder);
System.out.println(
"DECODED: " + m.getId() + " " + m.getContentType() + " " + m.getFeatureLevelType());
} catch (Exception e) {
System.out.println("DECODER ERROR: " + e.getMessage());
e.printStackTrace();
}
}
}
} finally {
System.out.println(
consumer.close();
}
}
}
Upvotes: 3
Reputation: 768
If you are using the Maven artefact for your project, you can use the following plugin in your pom.xml to auto-generate the avro classes from your .avsc.
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
<stringType>String</stringType>
<fieldVisibility>PRIVATE</fieldVisibility>
</configuration>
</execution>
</executions>
</plugin>
I found this http://www.javawenti.com/?post=16104 as more meaningful sample, which you can try.
How to encode/decode Kafka messages using Avro binary encoder?
Upvotes: 0