Reputation: 116
I'm doing join between 2 simple KTables and let suppose that I have 2 topics with the following data, see details:
First topic
1 | Victor C.
1 | Vadim C.
2 | Vasile P.
3 | Vitalie C.
4 | Oleg C.
Second topic
1 | Programmer
2 | Administrator
3 | Manager
SQL behaviour is obviously clear when I'm doing the following query and the output is clear understandable for me:
SELECT * FROM firstTable
INNER JOIN secondTable
ON firstTable.ID = secondTable.ID
1 | Victor C. | 1 | Programmer
1 | Vadim C. | 1 | Programmer
2 | Vasile P. | 2 | Administrator
3 | Vitalie C. | 3 | Manager
So, I'm playing arround Kafka and I tried to do the same behaviour but results of joined streams completely confused my mind
See the code snippet details:
@Test
public void joinKTableToKTableWhereKeyValueIsIntegerAndString() throws Exception {
InternalTestConfiguration itc = getInternalTestConfiguration();
List < String > topics = Arrays.asList(itc.getFirstTopic(), itc.getSecondTopic(), itc.getProjectionTopic(), itc.getFirstKeyedTopic(), itc.getSecondKeyedTopic());
KafkaStreams streams = null;
try {
Integer partitions = 1;
Integer replication = 1;
RestUtils.createTopics(topics, partitions, replication, new Properties());
List < KeyValue < Integer, String >> employees = Arrays.asList(
new KeyValue < > (1, "Victor C."),
new KeyValue < > (1, "Vadim C."),
new KeyValue < > (2, "Vasile P."),
new KeyValue < > (3, "Vitalie C."),
new KeyValue < > (4, "Oleg C.")
);
List < KeyValue < Integer, String >> specialities = Arrays.asList(
new KeyValue < > (1, "Programmer"),
new KeyValue < > (2, "Administrator"),
new KeyValue < > (3, "Manager")
);
List < KeyValue < Integer, String >> expectedResults = Arrays.asList(
new KeyValue < > (1, "Victor C./Programmer"),
new KeyValue < > (1, "Vadim C./Programmer"),
new KeyValue < > (2, "Vasile P./Administrator"),
new KeyValue < > (3, "Vitalie C../Manager")
);
final Serde < Integer > keySerde = Serdes.Integer();
final Serde < String > valueSerde = Serdes.String();
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, itc.getAppIdConfig());
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_CONNECT_CONFIG);
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
//streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
KStreamBuilder builder = new KStreamBuilder();
KTable < Integer, String > firstKTable = builder.table(keySerde, valueSerde, itc.getFirstTopic(), itc.getFirstStore());
KTable < Integer, String > secondKTable = builder.table(keySerde, valueSerde, itc.getSecondTopic(), itc.getSecondStore());
KTable < Integer, String > projectionKTable = firstKTable.join(secondKTable, (l, r) - > {
return l + "/" + r;
});
projectionKTable.to(keySerde, valueSerde, itc.getProjectionTopic());
streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
Properties cfg1 = new Properties();
cfg1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
cfg1.put(ProducerConfig.ACKS_CONFIG, "all");
cfg1.put(ProducerConfig.RETRIES_CONFIG, 0);
cfg1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
cfg1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceKeyValuesSynchronously(itc.getFirstTopic(), employees, cfg1);
Properties cfg2 = new Properties();
cfg2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
cfg2.put(ProducerConfig.ACKS_CONFIG, "all");
cfg2.put(ProducerConfig.RETRIES_CONFIG, 0);
cfg2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
cfg2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceKeyValuesSynchronously(itc.getSecondTopic(), specialities, cfg2);
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, itc.getGroupIdConfig());
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
List < KeyValue < Integer, String >> actualResults = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, itc.getProjectionTopic(), expectedResults.size());
assertThat(actualResults).containsExactlyElementsOf(expectedResults);
} finally {
if (streams != null) {
streams.close();
}
RestUtils.deleteTopics(topics);
}
}
I expect the same results as SQL have but this is not true.
Results with //streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0)" disabled
1, Vadim C./Programmer
2, Vasile P./Administrator
3, Vitalie C./Manager
1, Vadim C./Programmer
2, Vasile P./Administrator
3, Vitalie C./Manager
Results with streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); enabled
1, Vadim C./Programmer
2, Vasile P./Administrator
3, Vitalie C./Manager
Anyway both of results are not the same as SQL, please help me to understand this because I'm already killing my self :(
Upvotes: 1
Views: 192
Reputation: 2145
In your SQL comparison, the number does not seem to be a primary key because both Victor C.
and Vadim C.
are associated with 1
This doesn't work in the KTable if the number is the key of the message – Vadim C.
is overwriting Victor C.
. That's why you only have three different people in the output.
The second part of your question concerning the caching behaviour of the KTables. With caching enabled (your first example), the joins are triggered when the caches are flushed (30 seconds by default). There is also an issue with duplicates when caching is enabled. This doesn't occur when you disabled caching, so that's the "correct" output without duplicates.
I recently blogged about join behaviour in Kafka 0.10.1 (so not the newest release which changed a few semantics). Maybe that's helpful to you.
Upvotes: 3