srikanth
srikanth

Reputation: 978

How to join 2 streams in kafka?

Learning Kafka Streams, trying to join two streams(Json value) on a 5-minute window. My understanding is to have same key for values to match join criteria. If my understanding is correct, like keys only get to join, right? If so, how do I join the json values. I.e. Stream1: Key=a, value={a,b,c}. Stream2: Key=a, value={x} and key=a, value={y}. Expected o/p: {a,b,c,x} and {a,b,c,y}.

To achieve this, how should my ValueJoiner look like. Help me with this. My sample code:

KStream<String, JsonNode> resultStream = stream1.leftJoin(stream2,
                new ValueJoiner<JsonNode, JsonNode, JsonNode>() {
                    @Override
                    public JsonNode apply(JsonNode value1, JsonNode value2) {
                        if (value1 != null && value2 != null) {


                            return value1;
                        }
                        return null;
                    }
                }, JoinWindows.of(TimeUnit.SECONDS.toMillis(20)), Joined.with(Serdes.String(), /* key */
                        jsonSerde, /* left value */
                        jsonSerde) /* right value */
        );

Upvotes: 0

Views: 1087

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62360

Your understanding on how the joins work is correct (assuming that the record timestamp different is smaller than the join-window size).

For manipulating JsonNodes, just search the internet: How to modify JsonNode in Java?

Upvotes: 1

Related Questions