Reputation: 23
I have a setup where each kafka message will contain a "sender" field. All these message are sent to a single topic.
Is there a way to segregate these messages at the consumer side? I would like sender specific consumer that will read all messages pertaining to that sender alone.
Should I be using Kafka Streams to achieve this? I am new to Kafka Streams, any advice guidance will be helpful.
public class KafkaStreams3 {
public static void main(String[] args) throws JSONException {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafkastreams1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final Serde < String > stringSerde = Serdes.String();
Properties kafkaProperties = new Properties();
kafkaProperties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.put("bootstrap.servers", "localhost:9092");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProperties);
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(stringSerde, stringSerde, "topic1");
KStream<String, String> s1 = source.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String dummy, String record) {
JSONObject jsonObject;
try {
jsonObject = new JSONObject(record);
return new KeyValue<String,String>(jsonObject.get("sender").toString(), record);
} catch (JSONException e) {
e.printStackTrace();
return new KeyValue<>(record, record);
}
}
});
s1.print();
s1.foreach(new ForeachAction<String, String>() {
@Override
public void apply(String key, String value) {
ProducerRecord<String, String> data1 = new ProducerRecord<String, String>(
key, key, value);
producer.send(data1);
}
});
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
streams.close();
producer.close();
}
}));
}
}
Upvotes: 0
Views: 1483
Reputation: 4438
I believe the simplest way to achieve this is to use your "sender" field as a key and to have a single topic partitioned by "sender", this will give you locality and order per "sender" so you get a stronger ordering guarantee per "sender" and you can connect clients to consume from specific partitions.
Other possibility is that from the initial topic you stream your messages to other topics aggregating by key so you would end up having one topic per "sender".
Here's a fragment of code for a producer and then streaming with json serializers and deserializers.
Producer:
private Properties kafkaClientProperties() {
Properties properties = new Properties();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
properties.put("bootstrap.servers", config.getHost());
properties.put("client.id", clientId);
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", jsonSerializer.getClass());
return properties;
}
public Future<RecordMetadata> send(String topic, String key, Object instance) {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.convertValue(instance, JsonNode.class);
return kafkaProducer.send(new ProducerRecord<>(topic, key,
jsonNode));
}
The stream:
log.info("loading kafka stream configuration");
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, config.getStreamEnrichProduce().getId());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
//stream from topic...
KStream<String, JsonNode> stockQuoteRawStream = kStreamBuilder.stream(Serdes.String(), jsonSerde , config.getStockQuote().getTopic());
Map<String, Map> exchanges = stockExchangeMaps.getExchanges();
ObjectMapper objectMapper = new ObjectMapper();
kafkaProducer.configure(config.getStreamEnrichProduce().getTopic());
// - enrich stockquote with stockdetails before producing to new topic
stockQuoteRawStream.foreach((key, jsonNode) -> {
StockQuote stockQuote = null;
StockDetail stockDetail;
try {
stockQuote = objectMapper.treeToValue(jsonNode, StockQuote.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
JsonNode exchangeNode = jsonNode.get("exchange");
// get stockDetail that matches current quote being processed
Map<String, StockDetail> stockDetailMap = exchanges.get(exchangeNode.toString().replace("\"", ""));
stockDetail = stockDetailMap.get(key);
stockQuote.setStockDetail(stockDetail);
kafkaProducer.send(config.getStreamEnrichProduce().getTopic(), null, stockQuote);
});
return new KafkaStreams(kStreamBuilder, props);
Upvotes: 1