Zigmaphi
Zigmaphi

Reputation: 15

Write to kafka Topic based on the content on content of record using kafkastreams

I'm trying to write from a topic(parent) to another topic(child) in kafka based on the content of the records in the parent. A sample record if i consume from the parent topic is {"date":{"string":"2017-03-20"},"time":{"string":"20:04:13:563"},"event_nr":1572470,"interface":"Transaction Manager","event_id":5001,"date_time":1490040253563,"entity":"Transaction Manager","state":0,"msg_param_1":{"string":"ISWSnk"},"msg_param_2":{"string":"Application startup"},"msg_param_3":null,"msg_param_4":null,"msg_param_5":null,"msg_param_6":null,"msg_param_7":null,"msg_param_8":null,"msg_param_9":null,"long_msg_param_1":null,"long_msg_param_2":null,"long_msg_param_3":null,"long_msg_param_4":null,"long_msg_param_5":null,"long_msg_param_6":null,"long_msg_param_7":null,"long_msg_param_8":null,"long_msg_param_9":null,"last_sent":{"long":1490040253563},"transmit_count":{"int":1},"team_id":null,"app_id":{"int":4},"logged_by_app_id":{"int":4},"entity_type":{"int":3},"binary_data":null}.

I'll like to use the value of entity to write to a topic of the same name as the value of entity(There's a fixed amount of values of entity so i can statically create that if it's difficult programmatically to dynamically create topics). I'm trying to use this

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;

public class entityDataLoader {
    public static void main(final String[] args) throws Exception {
    final Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-function-lambda-example");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    // Set up serializers and deserializers, which we will use for overriding the default serdes
    // specified above.
    final Serde<String> stringSerde = Serdes.String();
    final Serde<byte[]> byteArraySerde = Serdes.ByteArray();

    // In the subsequent lines we define the processing topology of the Streams application.
    final KStreamBuilder builder = new KStreamBuilder();

    // Read the input Kafka topic into a KStream instance.
    final KStream<byte[], String> textLines = builder.stream(byteArraySerde, stringSerde, "postilion-events");

    String content = textLines.toString();
    String entity = JSONExtractor.returnJSONValue(content, "entity");
    System.out.println(entity);

    textLines.to(entity);

    final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
    streams.cleanUp();
    streams.start();

    // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

The content of content is org.apache.kafka.streams.kstream.internals.KStreamImpl@568db2f2 making it obvious that @KStream.toString() isn't the correct method to use to attempt to get the value of entity.

P.S. The JSONExtractor class is defined as

import org.json.simple.JSONObject;
import org.json.simple.parser.ParseException;
import org.json.simple.parser.JSONParser;
class JSONExtractor {

public static String returnJSONValue(String args, String value){
    JSONParser parser = new JSONParser();
    String app= null;
    System.out.println(args);
    try{
        Object obj = parser.parse(args);
        JSONObject JObj = (JSONObject)obj;
        app= (String) JObj.get(value);
        return app;
    }
    catch(ParseException pe){
        System.out.println("No Object found");
        System.out.println(pe);
    }
    return app;
}
}

Upvotes: 0

Views: 772

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

You can use branch() to split your parent stream into "sub streams" and write each "sub stream" to one output topic (cf. http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations)

Your branch() must create a single "sub stream" for all you output topic, but because you know all you topics, this should not be a problem.

Also, for Kafka Streams, it's recommended to create all output topic before you start you application (cf. http://docs.confluent.io/current/streams/developer-guide.html#user-topics)

Upvotes: 1

Related Questions