Kevin McMahon
Kevin McMahon

Reputation: 1

Consuming json with kafkastreams that I want to split and send to two different topics

I have a large json that is coming in from a kafka topic that I am converting to a Java object to pull out just the values I need in the find DB. Some of the records will have an array of defects in them that I need to capture and send to a different topic so they can end up in their own table in the DB. Values are being inserted into the DB using a sink connector, that is why we are using multiple topics.

I have found branching and split, but it looks like that is more for determining which topic a consumed record should go to, not sending different parts of the record to different topics. Is there a way to do this or do I need to change my architecture somewhere.

    @Autowired
    void buildPipeline(StreamsBuilder builder) throws Exception{
        KStream<String, String> messageStream = builder.stream(inputTopic, Consumed.with(STRING_SERDE, STRING_SERDE));
                logger.info("started consumer");
        System.out.println(messageStream.toString());

        KStream<String, String> auditRecords = messageStream
                  .map((key, value) -> {
                      try {
                          return new KeyValue<>("", mapStringToAuditOutput(value));
                      } catch (JsonProcessingException e) {
                          e.printStackTrace();
                      }
                      return null;
                  });
     auditRecords.to(outputTopic);

    }
    public String mapStringToAuditOutput(String input) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        AuditOutput auditResults = null;
        try {
            auditResults=  mapper.readValue(input, AuditOutput.class);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        //check for multiple defects
        if(auditResults.getCaseObject().getDefects() != null){
            //send defects to separate topic

        }
        String auditJson = JsonFlattener.flatten(mapper.writeValueAsString(auditResults));
        System.out.println(auditJson);
        return auditJson;
    }

Upvotes: 0

Views: 1143

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191681

found branching and split, but it looks like that is more for determining which topic a consumed record should go to

Correct. You need to filter + map/mapValues prior to the branching to send parts/whole events to different topics

More specifically, create intermediate KStream instances and use to() multiple times

For example,

// Change the value serde to use your JSON class
KStream<String, AuditOutput> auditRecords = messageStream 
  .mapValues(value -> {
      try {
          // input Stream can be <String, String> but this step would be automatic if using a JSONSerde
          return mapStringToAuditOutput(value));
      } catch (JsonProcessingException e) {
          e.printStackTrace();
      }
      return null;
  }).filter((key, value) -> Objects.nonNull(value)); // remove bad JSON events
Map<String, KStream<String, AuditOutput>> branches = auditRecords.split(Named.as("Branch-"))
    .branch((key, value) -> value.getCaseObject().getDefects() != null,  /* first predicate  */
         Branched.as("Defects"))
    .defaultBranch(Branched.as("NoDefects"))              /* default branch */
);
branches.get("Branch-Defects").to(...)
branches.get("Branch-NoDefects").mapValues(... flatten ... ).to(...)

Upvotes: 1

Related Questions