victor.chicu
victor.chicu

Reputation: 116

Kafka Streams wrong joined results of 2 simple KTables

I'm doing join between 2 simple KTables and let suppose that I have 2 topics with the following data, see details:

First topic

1 | Victor C.

1 | Vadim C.

2 | Vasile P.

3 | Vitalie C.

4 | Oleg C.

Second topic

1 | Programmer

2 | Administrator

3 | Manager

SQL behaviour is obviously clear when I'm doing the following query and the output is clear understandable for me:

SELECT * FROM firstTable
INNER JOIN secondTable
ON firstTable.ID = secondTable.ID

1 | Victor C. | 1 | Programmer

1 | Vadim C. | 1 | Programmer

2 | Vasile P. | 2 | Administrator

3 | Vitalie C. | 3 | Manager

So, I'm playing arround Kafka and I tried to do the same behaviour but results of joined streams completely confused my mind

See the code snippet details:

  @Test
  public void joinKTableToKTableWhereKeyValueIsIntegerAndString() throws Exception {
      InternalTestConfiguration itc = getInternalTestConfiguration();
      List < String > topics = Arrays.asList(itc.getFirstTopic(), itc.getSecondTopic(), itc.getProjectionTopic(), itc.getFirstKeyedTopic(), itc.getSecondKeyedTopic());
      KafkaStreams streams = null;

      try {
          Integer partitions = 1;
          Integer replication = 1;
          RestUtils.createTopics(topics, partitions, replication, new Properties());

          List < KeyValue < Integer, String >> employees = Arrays.asList(
              new KeyValue < > (1, "Victor C."),
              new KeyValue < > (1, "Vadim C."),
              new KeyValue < > (2, "Vasile P."),
              new KeyValue < > (3, "Vitalie C."),
              new KeyValue < > (4, "Oleg C.")
          );

          List < KeyValue < Integer, String >> specialities = Arrays.asList(
              new KeyValue < > (1, "Programmer"),
              new KeyValue < > (2, "Administrator"),
              new KeyValue < > (3, "Manager")
          );

          List < KeyValue < Integer, String >> expectedResults = Arrays.asList(
              new KeyValue < > (1, "Victor C./Programmer"),
              new KeyValue < > (1, "Vadim C./Programmer"),
              new KeyValue < > (2, "Vasile P./Administrator"),
              new KeyValue < > (3, "Vitalie C../Manager")
          );

          final Serde < Integer > keySerde = Serdes.Integer();
          final Serde < String > valueSerde = Serdes.String();

          Properties streamsConfiguration = new Properties();
          streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, itc.getAppIdConfig());
          streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
          streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_CONNECT_CONFIG);
          streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
          streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
          streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
          //streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
          streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
          streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());

          KStreamBuilder builder = new KStreamBuilder();

          KTable < Integer, String > firstKTable = builder.table(keySerde, valueSerde, itc.getFirstTopic(), itc.getFirstStore());
          KTable < Integer, String > secondKTable = builder.table(keySerde, valueSerde, itc.getSecondTopic(), itc.getSecondStore());
          KTable < Integer, String > projectionKTable = firstKTable.join(secondKTable, (l, r) - > {
              return l + "/" + r;
          });

          projectionKTable.to(keySerde, valueSerde, itc.getProjectionTopic());

          streams = new KafkaStreams(builder, streamsConfiguration);

          streams.start();

          Properties cfg1 = new Properties();
          cfg1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
          cfg1.put(ProducerConfig.ACKS_CONFIG, "all");
          cfg1.put(ProducerConfig.RETRIES_CONFIG, 0);
          cfg1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
          cfg1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
          IntegrationTestUtils.produceKeyValuesSynchronously(itc.getFirstTopic(), employees, cfg1);

          Properties cfg2 = new Properties();
          cfg2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
          cfg2.put(ProducerConfig.ACKS_CONFIG, "all");
          cfg2.put(ProducerConfig.RETRIES_CONFIG, 0);
          cfg2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
          cfg2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
          IntegrationTestUtils.produceKeyValuesSynchronously(itc.getSecondTopic(), specialities, cfg2);

          Properties consumerConfig = new Properties();
          consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
          consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, itc.getGroupIdConfig());
          consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
          consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
          consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

          List < KeyValue < Integer, String >> actualResults = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, itc.getProjectionTopic(), expectedResults.size());

          assertThat(actualResults).containsExactlyElementsOf(expectedResults);               
      } finally {
          if (streams != null) {
              streams.close();
          }

          RestUtils.deleteTopics(topics);
      }
  }

I expect the same results as SQL have but this is not true.

Results with //streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0)" disabled

1, Vadim C./Programmer

2, Vasile P./Administrator

3, Vitalie C./Manager

1, Vadim C./Programmer

2, Vasile P./Administrator

3, Vitalie C./Manager

Results with streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); enabled

1, Vadim C./Programmer

2, Vasile P./Administrator

3, Vitalie C./Manager

Anyway both of results are not the same as SQL, please help me to understand this because I'm already killing my self :(

Upvotes: 1

Views: 192

Answers (1)

ftr
ftr

Reputation: 2145

In your SQL comparison, the number does not seem to be a primary key because both Victor C. and Vadim C. are associated with 1

This doesn't work in the KTable if the number is the key of the message – Vadim C. is overwriting Victor C.. That's why you only have three different people in the output.

The second part of your question concerning the caching behaviour of the KTables. With caching enabled (your first example), the joins are triggered when the caches are flushed (30 seconds by default). There is also an issue with duplicates when caching is enabled. This doesn't occur when you disabled caching, so that's the "correct" output without duplicates.

I recently blogged about join behaviour in Kafka 0.10.1 (so not the newest release which changed a few semantics). Maybe that's helpful to you.

Upvotes: 3

Related Questions