Reputation: 978
Learning Kafka Streams, trying to join two streams(Json value) on a 5-minute window. My understanding is to have same key for values to match join criteria. If my understanding is correct, like keys only get to join, right? If so, how do I join the json values. I.e. Stream1: Key=a, value={a,b,c}. Stream2: Key=a, value={x} and key=a, value={y}. Expected o/p: {a,b,c,x} and {a,b,c,y}.
To achieve this, how should my ValueJoiner look like. Help me with this. My sample code:
KStream<String, JsonNode> resultStream = stream1.leftJoin(stream2,
new ValueJoiner<JsonNode, JsonNode, JsonNode>() {
@Override
public JsonNode apply(JsonNode value1, JsonNode value2) {
if (value1 != null && value2 != null) {
return value1;
}
return null;
}
}, JoinWindows.of(TimeUnit.SECONDS.toMillis(20)), Joined.with(Serdes.String(), /* key */
jsonSerde, /* left value */
jsonSerde) /* right value */
);
Upvotes: 0
Views: 1087
Reputation: 62360
Your understanding on how the joins work is correct (assuming that the record timestamp different is smaller than the join-window size).
For manipulating JsonNodes, just search the internet: How to modify JsonNode in Java?
Upvotes: 1