Reputation: 41
How to perform one to many join between two kafka KStream? The code given below joins two Kafka KStream which in a one to one manner. Can someone guide so as how to perform a one to many join between KStream? The data which is received in the topic is of generics < String ,JsonNode > The data which is getting written in topic is of the form {"from order":"test1:,"from orderitem":"test2"} {"from order":"test1:,"from orderitem":"test3"}
Is it possible to get the data in this format: {"from order":"test1,{"from orderitem":"test2"},{"from orderitem":"test3"}}
public class ConsumerThreadPool {
private static final String TOPIC = "jre1";
private static final String NEXTTOPIC ="Kafka";
private static final String FINALTOPIC="jvm1";
private static final Integer NUM_THREADS = 1;
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final Serde<String> stringSerde = Serdes.String();
int threadNumber = 0;
@Autowired
private ConsumerConfigFactory consumerConfigFactory;
@SuppressWarnings("unused")
private ConsumerConnector consumer;
private ExecutorService threadPool;
public ConsumerThreadPool() {
threadPool = Executors.newFixedThreadPool(NUM_THREADS);
}
@PostConstruct
public void startConsuming() {
ConsumerConfig consumerConfig = consumerConfigFactory.getConsumerConfig();
consumer = createJavaConsumerConnector(consumerConfig);
KStreamBuilder builder = new KStreamBuilder();
/* KTable<String,JsonNode> message = builder.table(stringSerde,jsonSerde,TOPIC);
KTable<String,JsonNode> orderstream = message
.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER")
);
KTable<String,JsonNode> orderlist=message.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER_ITEM"));
orderstream.to(stringSerde,jsonSerde,FINALTOPIC);
orderlist.to(stringSerde,jsonSerde,FINALTOPIC); */
KStream<String,JsonNode>streams=builder.stream(TOPIC);
KStream<String,JsonNode> orderstream=streams.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER"))
.map((k,v)->KeyValue.pair(v.path("after").path("ROW_ID").asText(),v));
KStream<String, JsonNode> orderlist=streams.filter((k,v)-> v.path("table").asText().equals("TEST.S_ORDER_ITEM"))
.map((k,v)->KeyValue.pair(v.path("after").path("ORDER_ID").asText(),v));
KStream<String,JsonNode> nextstream =orderstream.join(orderlist,(new ValueJoiner<JsonNode,JsonNode,JsonNode>(){
@Override
public JsonNode apply(JsonNode first,JsonNode second){
ObjectNode jNode = JsonNodeFactory.instance.objectNode();
return jNode.put("from order",first.get("op_type").textValue())
.put("from orderitem",second.get("op_type").textValue() );
}
}),JoinWindows.of(TimeUnit.SECONDS.toMillis(30)),stringSerde,jsonSerde,jsonSerde);
nextstream.to(stringSerde,jsonSerde,FINALTOPIC);
KafkaStreams stream=new KafkaStreams(builder, consumerConfigFactory.getConsumeConfig());
stream.start();
consume();
stream.close();
}
public void consume() {
@SuppressWarnings("resource")
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(consumerConfigFactory.createConsume());
consumer.subscribe(Arrays.asList(FINALTOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if(!records.isEmpty()){
System.out.println("ConsumerRecords object created: "+records);
threadPool.submit(new MessageConsumer(records, threadNumber));
threadNumber++;
}
}
}
}
Upvotes: 0
Views: 862
Reputation: 62360
As you noted already, a KStream-KStream is is already a one-to-many join. It seems you want to aggregate all join results of a unique key into one record.
You can apply an .groupByKey().aggregate()
to do this. The aggregation function is initialized with an empty JSON and new records are added to the JSON each time a new join result arrives.
Upvotes: 2