0x SLC
0x SLC

Reputation: 165

Will multiple instances of Kstreams apps process stateful data if the repartitioned key isn't the same as the state store key?

Example: I have a passenger stream and a bus stream. Passengers have an ID and a gps value in their stream. The Bus stream has an ID and and one passengerID (each time a new passenger gets on the bus a new message is put on the bus topic with the bus ID and passenger ID in it's payload).

I want to calculate for each bus, it's mileage - based on the GPS value from the "primary" passenger (they have the most reliable GPS data). The Primary passenger changes from time to time.

I need to ensure that there's not multiple distance calculations for the same bus (and different passengers) happening across the different streaming app instances.

My question:

Will the passenger data for a specific busID always be sent to the same partition to ensure that the previous and current passenger messages won't be sent into separate partitions and thus sent to separate stream instance - which will cause invalid GPS distance calculations?

passengerStream.map((k, v) -> new KeyValue<>(v.getPassengerId(), v))
        .join(busTable, busPassengerStatus::new,
            Joined.with(Serdes.String(), passengerStreamSerde, busPassengerJsonSerDe))
        .map((k, v) -> new KeyValue<>(v.getBusId(), v))
        .transform(distanceProcessorSupplier, calculatedDistanceStoreSupplier.name(), previousPassengerStateStoreSupplier.name())
        //.print(Printed.toSysOut())
        .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), new JsonSerDe<>(CalculatedDistance.class)));

Details

There is a bus KTable that has a key of passengerID, and a value of BusId, a State Store that keeps the previous record of the "primary" passenger (to calculate the distance between two points) and a rolling distance state store that takes the previous distance and adds the new distance based on current and previous GPS.

Upvotes: 0

Views: 120

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

Data is partitioned based on a "partitioning strategy". By default, messages are partitioned by their key. Hence, if you don't specify a different partitioner you can set the busId in the message key, and all records for the same bus with go to the same partition.

If you cannot set the busId as message key, you could implement a custom Partitioner and configure your producer with your custom partitioner that can extract the busId from the value and compute a corresponding partition the message should be written to. As third alternative, when you create a ProducerRecord you can specify its partition explicitly (for this case, the Partitioner won't be used to compute a partition).

Upvotes: 1

Related Questions