erKito
erKito

Reputation: 481

Read an AVRO encoded byte array record

How can I generate classes from avro json spec and use the generated classes to decode a message from kafka client?

I've found some partial example using data from file, but nothing working using kafka as source (without schema registry).

Upvotes: 0

Views: 1372

Answers (2)

erKito
erKito

Reputation: 481

thank you ChristDist!

I post the followe code as compact example to how use the genaration classes:

public class Main {

    public static String convertTime(long time) {
        Date date = new Date(time);
        Format format = new SimpleDateFormat("yyyy MM dd HH:mm:ss");
        return format.format(date);
    }

    public static void main(String[] args) {
        Properties props = new Properties();

        

        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
                "your broker");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        props.put(SaslConfigs.SASL_JAAS_CONFIG,
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"your-username\" password=\"your-password\";");

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-avro");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ByteArrayDeserializer.class);

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        String topic = "messages-payments";
        final Consumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props);
        consumer.subscribe(Arrays.asList(topic));

        try {
            for (int i = 0; i < 1000; i++) {
                ConsumerRecords<String, byte[]> records = consumer.poll(10000);
                System.out.println("************************************************************************");
                System.out.println("!!!!!!!records length: " + records.count());
                for (ConsumerRecord<String, byte[]> record : records) {
                    System.out.printf("offset = %d, date = %s, key = %s, value = %s \n", record.offset(),
                            convertTime(record.timestamp()),
                            record.key(),
                            new String(record.value(), StandardCharsets.UTF_8));
                    System.out.println("HEX:" + new String(Hex.encode(record.value())));

                    DatumReader<message> reader = new SpecificDatumReader<>(message.class);
                    try {
                        Decoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
                        message m = reader.read(null, decoder);

                        System.out.println(
                                "DECODED: " + m.getId() + " " + m.getContentType() + " " + m.getFeatureLevelType());
                    } catch (Exception e) {
                        System.out.println("DECODER ERROR: " + e.getMessage());
                        e.printStackTrace();
                    }
                }
            }
        } finally {
            System.out.println(
            consumer.close();
        }
    }
}

Upvotes: 3

ChristDist
ChristDist

Reputation: 768

If you are using the Maven artefact for your project, you can use the following plugin in your pom.xml to auto-generate the avro classes from your .avsc.

        <plugin>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>${avro.version}</version>
            <executions>
                <execution>
                    <phase>generate-sources</phase>
                    <goals>
                        <goal>schema</goal>
                    </goals>
                    <configuration>
                        <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
                        <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        <stringType>String</stringType>
                        <fieldVisibility>PRIVATE</fieldVisibility>
                    </configuration>
                </execution>
            </executions>
        </plugin>

I found this http://www.javawenti.com/?post=16104 as more meaningful sample, which you can try.

How to encode/decode Kafka messages using Avro binary encoder?

Upvotes: 0

Related Questions