Reputation: 1
I have a large json that is coming in from a kafka topic that I am converting to a Java object to pull out just the values I need in the find DB. Some of the records will have an array of defects in them that I need to capture and send to a different topic so they can end up in their own table in the DB. Values are being inserted into the DB using a sink connector, that is why we are using multiple topics.
I have found branching and split, but it looks like that is more for determining which topic a consumed record should go to, not sending different parts of the record to different topics. Is there a way to do this or do I need to change my architecture somewhere.
@Autowired
void buildPipeline(StreamsBuilder builder) throws Exception{
KStream<String, String> messageStream = builder.stream(inputTopic, Consumed.with(STRING_SERDE, STRING_SERDE));
logger.info("started consumer");
System.out.println(messageStream.toString());
KStream<String, String> auditRecords = messageStream
.map((key, value) -> {
try {
return new KeyValue<>("", mapStringToAuditOutput(value));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
});
auditRecords.to(outputTopic);
}
public String mapStringToAuditOutput(String input) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
AuditOutput auditResults = null;
try {
auditResults= mapper.readValue(input, AuditOutput.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
//check for multiple defects
if(auditResults.getCaseObject().getDefects() != null){
//send defects to separate topic
}
String auditJson = JsonFlattener.flatten(mapper.writeValueAsString(auditResults));
System.out.println(auditJson);
return auditJson;
}
Upvotes: 0
Views: 1143
Reputation: 191681
found branching and split, but it looks like that is more for determining which topic a consumed record should go to
Correct. You need to filter + map/mapValues prior to the branching to send parts/whole events to different topics
More specifically, create intermediate KStream instances and use to()
multiple times
For example,
// Change the value serde to use your JSON class
KStream<String, AuditOutput> auditRecords = messageStream
.mapValues(value -> {
try {
// input Stream can be <String, String> but this step would be automatic if using a JSONSerde
return mapStringToAuditOutput(value));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}).filter((key, value) -> Objects.nonNull(value)); // remove bad JSON events
Map<String, KStream<String, AuditOutput>> branches = auditRecords.split(Named.as("Branch-"))
.branch((key, value) -> value.getCaseObject().getDefects() != null, /* first predicate */
Branched.as("Defects"))
.defaultBranch(Branched.as("NoDefects")) /* default branch */
);
branches.get("Branch-Defects").to(...)
branches.get("Branch-NoDefects").mapValues(... flatten ... ).to(...)
Upvotes: 1