Sergey Shcherbakov
Sergey Shcherbakov

Reputation: 4778

Spring Cloud Stream Kafka Streams: The number of downstream messages doesn't match the sum of messages sent to the topic

I have a Spring Boot based Spring Cloud Stream Kafka Streams Binder application. It defines a topology with the following piece in it:

enter image description here

The numbers in green show the number of messages passed through the topology defined by the respective processors bound via Spring Cloud Stream Kafka Streams binder, here are the respective properties:

spring.cloud.stream.bindings:
  ...
  hint1Stream-out-0:
    destination: hints
  realityStream-out-0:
    destination: hints
  countStream-in-0:
    destination: hints

I am counting the messages that each processor produces / consumes using peek() methods as following:

return stream -> {
    stream
        .peek((k, v)-> input0count.incrementAndGet())
        ...
        .peek((k, v)-> output0count.incrementAndGet())
};

I am starting my application from a unit test using Embedded Kafka with pretty much default settings:

@RunWith(SpringRunner.class)
@SpringBootTest(
    properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
        topics = {
                ...
                TOPIC_HINTS
        }
)
public class MyApplicationTests {
...

In my test I am waiting sufficiently long until all published test messages reach the countStream:

CountDownLatch latch = new CountDownLatch(1);
...
publishFromCsv(...)
...
latch.await(30, TimeUnit.SECONDS);
logCounters();

As you can see, the sum of the messages put into the "hints" topic doesn't match the count of messages on the "counterStream" side: 1309 + 2589 != 3786

I am probably missing some Kafka or Kafka Streams setting to flush every batch? Maybe my custom TimestampExtractor generates timestamps "too old"? (I'm pretty sure they are not less zero) Maybe this has something to do with the Kafka log compaction?

What could probably be the reason for this mismatch?

Update

Checked the underlying topic offsets by executing

kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:60231 --topic hints

while the test was waiting for timeout.

The number of messages in the topic is equal to the sum of two input streams counts, as expected. The number of messages passed arrived at the counterStream input is still a couple of dozens less than expected.

Other Kafka configuration in use:

spring.cloud.stream.kafka.streams:
    configuration:
      schema.registry.url: mock://torpedo-stream-registry
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
      commit.interval.ms: 100

That corresponds to processing.guarantee = at_least_once. Could not test processing.guarantee = exactly_once since that requires a cluster of at least 3 brokers available.

Setting both:

spring.cloud.stream.kafka.binder.configuration:
  auto.offset.reset: earliest
spring.cloud.stream.kafka.streams.binder.configuration:
  auto.offset.reset: earliest
spring.cloud.stream.kafka.streams:
  default:
    consumer:
      startOffset: earliest
spring.cloud.stream.bindings:
  countStream-in-0:
    destination: hints
    consumer:
      startOffset: earliest
      concurrency: 1

didn't help :(

What helped was to only leave stream.peak(..) in the countStream consumer like:

@Bean
public Consumer<KStream<String, Hint>> countStream() {
    return stream -> {
        KStream<String, Hint> kstream = stream.peek((k, v) -> input0count.incrementAndGet());
    };
}

In this case I immediately start getting expected number of messages counted on the countConsumer side.

That means that my Count Consumer internals have impact on the behaviour.

Here is its full version which "doesn't work":

@Bean
public Consumer<KStream<String, Hint>> countStream() {
    return stream -> {
        KStream<String, Hint> kstream = stream.peek((k, v) -> notifyObservers(input0count.incrementAndGet()));

        KStream<String, Hint> realityStream = kstream
            .filter((key, hint) -> realityDetector.getName().equals(hint.getDetector()));

        KStream<String, Hint> hintsStream = kstream
            .filter((key, hint) -> !realityDetector.getName().equals(hint.getDetector()));

        this.countsTable = kstream
            .groupBy((key, hint) -> key.concat(":").concat(hint.getDetector()))
            .count(Materialized
                .as("countsTable"));

        this.countsByActionTable = kstream
            .groupBy((key, hint) -> key.concat(":")
                .concat(hint.getDetector()).concat("|")
                .concat(hint.getHint().toString()))
            .count(Materialized
                .as("countsByActionTable"));

        this.countsByHintRealityTable = hintsStream
            .join(realityStream,
                (hint, real) -> {
                    hint.setReal(real.getHint());
                    return hint;
                }, JoinWindows.of(countStreamProperties.getJoinWindowSize()))
            .groupBy((key, hint) -> key.concat(":")
                .concat(hint.getDetector()).concat("|")
                .concat(hint.getHint().toString()).concat("-")
                .concat(hint.getReal().toString())
            )
            .count(Materialized
                .as("countsByHintRealityTable"));

    };
}

I am storing counts in several KTables there. This is what is happening inside of the Counts Consumer:

enter image description here

Update 2

The last piece of the Count Consumer is apparently causing the initial unexpected behaviour:

this.countsByHintRealityTable = hintsStream
        .join(realityStream,
            (hint, real) -> {
                hint.setReal(real.getHint());
                return hint;
            }, JoinWindows.of(countStreamProperties.getJoinWindowSize()))
        .groupBy((key, hint) -> key.concat(":")
            .concat(hint.getDetector()).concat("|")
            .concat(hint.getHint().toString()).concat("-")
            .concat(hint.getReal().toString())
        )
        .count(Materialized
            .as("countsByHintRealityTable"));

Without it the message counts match as expected.

How can such downstream code affect the Consumer KStream input?

Upvotes: 2

Views: 2089

Answers (2)

abinet
abinet

Reputation: 2798

The messages can be deleted because of the retention policy. Changing topology reflects in changing the amount of time needed for processing. If retention appears during the processing you can loose the messages. It also depends on offset reset policy.

Try to set log.retention.hours=-1. This is going to disable the retention for auto created topics.

Upvotes: 1

Sergey Shcherbakov
Sergey Shcherbakov

Reputation: 4778

I thought that the following has helped me with the issue:

What has helped was splitting the Counter Consumer into two parts fully equivalent (from my perspective) to the single consumer implementation:

enter image description here

The message counts reported by peek() on both Consumer inputs show expected number of messages.

But it turned out that the results were non-deterministic. Every next run was producing different result, which sometimes still didn't match.

I found and deleted the following temporary folders that get created during the test run:

  • /tmp/kafka-streams/* (they all were empty)
  • /var/folders/ms/pqwfgz297b91gw_b8xymf1l00000gn/T/spring* (these look to be temporary folders of the embedded Kafka)

After that, I cannot reproduce the issue with the same code yet.

The temporary directory that I had to clean is creates in the spring-kafka-test EmbeddedKafkaBroker:

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java#L329

I would expect that this folder gets automatically deleted on gracious unit tests exit?

That is probably responsibility of Kafka itself, but the similar bug there looks to be already fixed: KAFKA-1258

I have set the Kafka broker log.dir to "target/kafka" in

kafka.properties

log.dir=target/kafka

MyApplicationTests.java

@RunWith(SpringRunner.class)
@SpringBootTest(
    properties = "spring.cloud.stream.kafka.binder.brokers=${spring.embedded.kafka.brokers}"
)
@EmbeddedKafka(partitions = 1,
        topics = {
                TOPIC_QUOTES,
                TOPIC_WINDOWS,
                TOPIC_HINTS,
                TOPIC_REAL
        },
        brokerPropertiesLocation = "kafka.properties"
)
@Slf4j
public class MyApplicationTests {

I can see how the target/kafka folder is full of the temporary folders and files during the test run. It also gets deleted on test exit well "by itself".

I still see some folders from the ${io.java.tmpdir} being in use in the test logs, e.g. /var/folders/ms/pqwfgz297b91gw_b8xymf1l00000gn/T/kafka-16220018198285185785/version-2/snapshot.0. They also get cleaned.

In most of the cases my counts match now. Still, I think I've seen one or another time that they don't.

Upvotes: 0

Related Questions