rb_9999
rb_9999

Reputation: 25

How to change a value in a field in an Avro object with Kafka Streams application

My Kafka Streams App is filtering for an Avro Object that look like this:

  "num_enterprise_txn_entity_edition": 1, 
  "num_enterprise_txn_entity_version": 1, 
  "dte_event_occurred": "2022-06-30T18:42:49.533301Z", 
  "nme_creator": "AvroProducer", 
  "nme_event_type": "ReadyToSubmit"}

My Kafka Streams consumes that object and transforms the nme_creator and nme_event_type field. The final Avro object should look like:

  "num_enterprise_txn_entity_edition": 1, 
  "num_enterprise_txn_entity_version": 1, 
  "dte_event_occurred": "2022-06-30T18:42:49.533301Z", 
  "nme_creator": "KafkaStreamsApp", 
  "nme_event_type": "NewSubmission"}

The problem is my code is not transforming nor is it producing to the topic.

import java.io.InputStream;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

import com.kinsaleins.avro.POCEntity;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;

import javax.xml.bind.helpers.ValidationEventLocatorImpl;


public class streams {

    public static void main(String[] args) throws IOException {

        Properties properties = new Properties();

        InputStream in = streams.class.getClassLoader().getResourceAsStream("kafkastream.properties");
        properties.load(in);

        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);

        final String inputTopic = properties.getProperty("producer.send.topic");
        final String outputTopic = "SubmissionsTopic";

        StreamsBuilder builder = new StreamsBuilder();


        KStream<String, POCEntity> firstStream = builder.stream(inputTopic);
        firstStream.peek((key, value) -> System.out.println("Value " + value))
            .filter((key, value) -> value.getNmeEventType().equals("ReadyToSubmit"))
            .mapValues((ValueMapper<POCEntity, Object>) pocEntity -> {
                POCEntity pocEntity1 = new POCEntity().newBuilder()
                    .setIdtEnterpriseTxnEntity(pocEntity.getIdtEnterpriseTxnEntity())
                    .setNumEnterpriseTxnEntityEdition(pocEntity.getNumEnterpriseTxnEntityEdition())
                    .setNumEnterpriseTxnEntityVersion(pocEntity.getNumEnterpriseTxnEntityVersion())
                    .setDteEventOccurred(ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))
                    .setNmeCreator("StreamsApp")
                    .setNmeEventType("NewSubmission").build();

                return pocEntity1;
            })
            .peek((key, value) -> System.out.println("Value " + value))
            .to(outputTopic);

        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
        kafkaStreams.start();

    }

}

What am I doing wrong?

Upvotes: 0

Views: 609

Answers (1)

Harsh Mishra
Harsh Mishra

Reputation: 2135

Try this. let me know if doesn't work

 .mapValues(pocEntity -> {
            return POCEntity.newBuilder()
                .setIdtEnterpriseTxnEntity(pocEntity.getIdtEnterpriseTxnEntity())
                .setNumEnterpriseTxnEntityEdition(pocEntity.getNumEnterpriseTxnEntityEdition())
                .setNumEnterpriseTxnEntityVersion(pocEntity.getNumEnterpriseTxnEntityVersion())
                .setDteEventOccurred(ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))
                .setNmeCreator("StreamsApp")
                .setNmeEventType("NewSubmission").build();

            
        })

Upvotes: 2

Related Questions