Reputation: 1433
I am trying to join two Kafka topics. One a KStream and other a KTable. The left join complains that state store for the processor is not present. I did look at many samples of code in kafka GitHub and elsewhere where StateStore is not explicitly created by KStream client code. Please let know what is missing from below code.
The application stream is left joined with users table to emit records with app and user together. An application has an owner who is an user.
version: 1.1.0
Thanks
public void process() {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
config.put(StreamsConfig.CLIENT_ID_CONFIG, CLIENT_ID);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Common.KAFKA_SOCKET);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomSerdes.applicationSerde);
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);
config.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
// User properties: userid, username
KTable<String, User> users = new StreamsBuilder().table(TOPIC_USERS,
Consumed.with(Serdes.String(), CustomSerdes.serdeFor(User.class)));
StreamsBuilder builder = new StreamsBuilder();
// Application properties: id, name
KStream<String, Application> stream = builder.stream(TOPIC_APPLICATIONS);
stream.
map((appId, app) -> KeyValue.pair(app.getOwnerId(), app.getAppId()))
.leftJoin(users, (app, user) -> "a:" + app + " u:" + user.getUserName())
.to(OUTPUT_TOPIC);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
StreamsManager.startAndHandleShutdown(streams);
}
Error:
Exception in thread "main" org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore topic-users-STATE-STORE-0000000000 is not added yet.
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStore(InternalTopologyBuilder.java:716)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStores(InternalTopologyBuilder.java:615)
at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:797)
at org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:817)
at org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:805)
at com.test.streams.users.AppWithUserConsumerMain.process(AppWithUserConsumerMain.java:50)
Upvotes: 1
Views: 1432
Reputation: 3955
to be able to use join, both parts of joining (in your case KStream
and KTable
) should be created from the same StreamsBuilder
, so they will belong to the same topology.
in your case you created two StreamsBuilder
, and as a result, KStream
and KTable
don't belong to the same topology.
Upvotes: 3