Reputation: 171
i am trying to join 2 KTables.
KTable<String, RecordBean> recordsTable = builder.table(Serdes.String(),
new JsonPOJOSerde<>(RecordBean.class),
bidTopic, RECORDS_STORE);
KTable<String, ImpressionBean> impressionsTable = builder.table(Serdes.String(),
new JsonPOJOSerde<>(ImpressionBean.class),
impressionTopic, IMPRESSIONS_STORE);
KTable<String, RecordBean> mergedByTxId = recordsTable
.join(impressionsTable, merge());
The merge functions is very simple, i am just copying value from one bean to another.
public static <K extends BidInfo, V extends BidInfo> ValueJoiner<K, V, K> merge() {
return (v1, v2) -> {
v1.setRtbWinningBidAmount(v2.getRtbWinningBidAmount());
return v1;
};
But for some reasons the join function is calling twice on single produced record. Please see streaming/producer config below
Properties streamsConfiguration = new Properties();
streamsConfiguration
.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-impressions");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zookeeperConnect());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, folder.newFolder("kafka-streams-tmp")
.getAbsolutePath());
return streamsConfiguration;
Producer config -
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return producerConfig;
Next i am submitting single record per stream. Both records has same keys. I am expecting to receive single record as output.
IntegrationTestUtils.produceKeyValuesSynchronously(bidsTopic,
Arrays.asList(new KeyValue("1", getRecordBean("1"))),
getProducerProperties());
IntegrationTestUtils.produceKeyValuesSynchronously(impressionTopic,
Arrays.asList(new KeyValue("1", getImpressionBean("1"))),
getProducerProperties());
List<KeyValue<String, String>> parsedRecord =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
outputTopic, 1);
But ValueJoiner triggers 2 times, and i am getting 2 identical output records instead one. During trigger time - both values from both streams exist - and i cannot get what is triggering second execution.
Without joining - i cannot reproduce this behavior. I cannot find any working example of 2 ktable join - so cannot understand whats wrong with my approach.
Adding simple code that demonstrate same behavior
KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> first = builder.table("stream1", "storage1");
KTable<String, String> second = builder.table("stream2", "storage2");
KTable<String, String> joined = first.join(second, (value1, value2) -> value1);
joined.to("output");
KafkaStreams streams = new KafkaStreams(builder, getStreamingProperties());
streams.start();
IntegrationTestUtils.produceKeyValuesSynchronously("stream1",
Arrays.asList(new KeyValue("1", "first stream")),
getProducerProperties());
IntegrationTestUtils.produceKeyValuesSynchronously("stream2",
Arrays.asList(new KeyValue("1", "second stream")),
getProducerProperties());
List<KeyValue<String, String>> parsedRecord =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(getConsumerProperties(),
"output", 1);
Upvotes: 8
Views: 4180
Reputation: 534
I found the same behavior using leftJoin between two KTables and stumbled upon this post after googling. I don't know what version of kafka-streams you were using, but after having debugged confluent code, kafka-streams version 2.0.1 seems to deliberately send old and new values in certain types of joins, so you get two calls to the ValueJoiner.
Take a look at the implementation of org.apache.kafka.streams.kstream.internals.KTableImpl#buildJoin
which constructs the join topology, as well as org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin.KTableKTableRightJoinProcessor#process
which dispatches it at runtime. Clearly done twice in some scenarios.
Here is some background of this behavior https://issues.apache.org/jira/browse/KAFKA-2984
Upvotes: 1
Reputation: 171
I got following explanation after posting similar question to Confluent mail groups.
I think this might be related to caching. The caches for the 2 tables are flushed independently, so there is a chance you will get the same record twice. If stream1 and stream2 both receive a record for the same key, and the cache flushes, then:
The cache from stream1 will flush, perform the join, and produce a record.
The cache from stream2 will flush, perform the join, and produce a record.
Technically this is ok as the result of the join is another KTable, so the value in the KTable will be the correct value.
After setting following variable to 0 StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0 - issue was resolved. I still got 2 records - but now one record is joined with null - and its much clear behavior according to join semantics document that was provided above.
Upvotes: 9