Reputation: 25
My Kafka Streams App is filtering for an Avro Object that look like this:
"num_enterprise_txn_entity_edition": 1,
"num_enterprise_txn_entity_version": 1,
"dte_event_occurred": "2022-06-30T18:42:49.533301Z",
"nme_creator": "AvroProducer",
"nme_event_type": "ReadyToSubmit"}
My Kafka Streams consumes that object and transforms the nme_creator
and nme_event_type
field. The final Avro object should look like:
"num_enterprise_txn_entity_edition": 1,
"num_enterprise_txn_entity_version": 1,
"dte_event_occurred": "2022-06-30T18:42:49.533301Z",
"nme_creator": "KafkaStreamsApp",
"nme_event_type": "NewSubmission"}
The problem is my code is not transforming nor is it producing to the topic.
import java.io.InputStream;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import com.kinsaleins.avro.POCEntity;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import javax.xml.bind.helpers.ValidationEventLocatorImpl;
public class streams {
public static void main(String[] args) throws IOException {
Properties properties = new Properties();
InputStream in = streams.class.getClassLoader().getResourceAsStream("kafkastream.properties");
properties.load(in);
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
final String inputTopic = properties.getProperty("producer.send.topic");
final String outputTopic = "SubmissionsTopic";
StreamsBuilder builder = new StreamsBuilder();
KStream<String, POCEntity> firstStream = builder.stream(inputTopic);
firstStream.peek((key, value) -> System.out.println("Value " + value))
.filter((key, value) -> value.getNmeEventType().equals("ReadyToSubmit"))
.mapValues((ValueMapper<POCEntity, Object>) pocEntity -> {
POCEntity pocEntity1 = new POCEntity().newBuilder()
.setIdtEnterpriseTxnEntity(pocEntity.getIdtEnterpriseTxnEntity())
.setNumEnterpriseTxnEntityEdition(pocEntity.getNumEnterpriseTxnEntityEdition())
.setNumEnterpriseTxnEntityVersion(pocEntity.getNumEnterpriseTxnEntityVersion())
.setDteEventOccurred(ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))
.setNmeCreator("StreamsApp")
.setNmeEventType("NewSubmission").build();
return pocEntity1;
})
.peek((key, value) -> System.out.println("Value " + value))
.to(outputTopic);
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start();
}
}
What am I doing wrong?
Upvotes: 0
Views: 609
Reputation: 2135
Try this. let me know if doesn't work
.mapValues(pocEntity -> {
return POCEntity.newBuilder()
.setIdtEnterpriseTxnEntity(pocEntity.getIdtEnterpriseTxnEntity())
.setNumEnterpriseTxnEntityEdition(pocEntity.getNumEnterpriseTxnEntityEdition())
.setNumEnterpriseTxnEntityVersion(pocEntity.getNumEnterpriseTxnEntityVersion())
.setDteEventOccurred(ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))
.setNmeCreator("StreamsApp")
.setNmeEventType("NewSubmission").build();
})
Upvotes: 2