Antonin
Antonin

Reputation: 950

Kafka Streams : ktable as lookup and destination to merge streams

Hi I have 3 streams of events I want to merge using Kafka Streams.

I can't find a simple solution to tackle the concurrency problem :

    // merged values Ktable
    KTable<String, ProdForecastPowerPlantAgg> mergedTable = builder.table(
            getMergedValuesTopic(),
            [...]);


    // records A

    // stream
    KStream<String, RecordA> recordsAStream = builder.stream(
            getRecordATopic(),
            [...]);

    // rekeyed stream
    KStream<String, ProductionRecordValue> recordsABySomeId = recordsAStream
            .selectKey((k, v) -> getKey(v);


    // records B

    // stream
    KStream<String, RecordB> recordsBStream = builder.stream(
            getRecordBTopic(),
            [...]);

    // rekeyed stream
    KStream<String, RecordB> recordsBBySomeId = recordsBStream
            .selectKey((k, v) -> getKey(v);


    // records C

    // stream
    KStream<String, RecordA> recordsCStream = builder.stream(
            getRecordCTopic(),
            [...]);

    // rekeyed stream
    KStream<String, ProductionRecordValue> recordsCBySomeId = recordsCStream
            .selectKey((k, v) -> getKey(v);


    // when a recordA arrives
    KStream<String, RecordA> aggFromA = recordsABySomeId
            .filter((k, v) -> v != null)
            // join recordA and current join result together
            .leftJoin(mergedTable, (recA, oldMerge) -> {
                        if (oldMerge != null) {
                            return new Merge(recA, oldMerge.B, oldMerge.C);
                        }
                        return new Merge(recA, null, null)
                    },
                    [...]
            );

    // when a recordB arrives
    KStream<String, RecordB> aggFromB = recordsBBySomeId
            .filter((k, v) -> v != null)
            // join recordB and current join result together
            .leftJoin(mergedTable, (recB, oldMerge) -> {
                        if (oldMerge != null) {
                            return new Merge(oldMerge.A, recB, oldMerge.C);
                        }
                        return new Merge(null, recB, null)
                    },
                    [...]
            );


    // when a recordC arrives
    KStream<String, RecordB> aggFromC = recordsCBySomeId
            .filter((k, v) -> v != null)
            // join recordB and current join result together
            .leftJoin(mergedTable, (recC, oldMerge) -> {
                        if (oldMerge != null) {
                            return new Merge(oldMerge.A, oldMerge.B, recC);
                        }
                        return new Merge(null, null, recC)
                    },
                    [...]
            );


    // save aggreagtion
aggFromA.merge(aggFromB).merge(aggFromC)
            .to(getMergedValuesTopic(), Produced.with(Serdes.String(), aggSerdes));



    return builder.build();

Indeed this snippet is invalid : the getMergedValuesTopic based KTable does not reflect the latest state of the merge when the lookup is done : when two different records arrive at the same time, one update can cancel the other (as the lookup is outdated).

Does anyone have a simple solution to this problem, using Kafka streams ?

Upvotes: 2

Views: 1767

Answers (1)

Olivier Stroesser
Olivier Stroesser

Reputation: 36

I think a simple aggregate should do the trick. An aggregation performs the operation you described : 'KTable as lookup and destination'.

On every arriving record, the aggregation table is checked for matches. In case no match is found, the Initializer defined in the aggregation is used to produce a new initial record : documentation available here

sample code:

public class KTableMerge {

protected Topology buildTopology() {
    final StreamsBuilder builder = new StreamsBuilder();

    //Streams
    KStream<String, RecordA> recordAKStream = builder.stream("test-record-a");
    KStream<String, RecordB> recordBKStream = builder.stream("test-record-b");
    KStream<String, RecordC> recordCKStream = builder.stream("test-record-c");

    //Re-key and Merge Streams in parent 'Record' container
    KStream<String, Record> mergedStream =
        recordAKStream
            .selectKey((key, value) -> value.getForeignKey())
            .mapValues(value -> (Record) value)
            .merge(recordBKStream
                .selectKey((key, value) -> value.getForeignKey())
                .mapValues(value -> (Record) value))
            .merge(recordCKStream
                .selectKey((key, value) -> value.getForeignKey())
                .mapValues(value -> (Record) value));

    //Aggregate
    mergedStream
        .groupByKey()
        .aggregate(
            Merge::new,
            (key, value, aggregate) -> {
                if (value instanceof RecordA) {
                    aggregate.recordA = (RecordA) value;
                } else if (value instanceof RecordB) {
                    aggregate.recordB = (RecordB) value;
                } else if (value instanceof RecordC) {
                    aggregate.recordC = (RecordC) value;
                }
                return aggregate;
            })
        .toStream()
        .to("merge-topic");

    return builder.build();
}

private static class Merge {
    RecordA recordA;
    RecordB recordB;
    RecordC recordC;
}

private interface Record {
    String getForeignKey();
}

private static class RecordA implements Record {
    String id;
    String foreignKey;

    public String getForeignKey() {
        return foreignKey;
    }
}

private static class RecordB implements Record {
    String id;
    String foreignKey;

    public String getForeignKey() {
        return foreignKey;
    }
}

private static class RecordC implements Record {
    String id;
    String foreignKey;

    public String getForeignKey() {
        return foreignKey;
    }
}

}

hope this helps

Upvotes: 2

Related Questions