stphngrtz
stphngrtz

Reputation: 221

Apache Kafka - is it possible to lose messages on error?

I'm digging into Apache Kafka with Spring Cloud Stream and observed some behavior that makes me wonder if I'm doing something wrong or if it is working as intended - which I hardly doubt:

It is possible to lose messages on error!?

My setup is as simple as possible. A single Kafka broker and a topic with only 1 partition. Broker, topic, producer and consumer with default settings (auto-ack is true).

testcase 1

Works as expected.

testcase 2

message1 will be skipped because the commited offset has been set to message3. This is what troubles me. I don't want the consumer to continue with messages as long as prior messages were not successfully processed.

Has anyone experiences the same behavior and/or maybe could guide me on how to change this?

Thanks in advance!


Update: as requested, some code snippets

Create the topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic

Connect a producer

kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

Create a maven project with

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.7.RELEASE</version>
    <relativePath/>
</parent>

...

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Dalston.SR4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>


<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
</dependencies>

Add the following application.yml

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: test-topic
          contentType: text/plain
          group: test-group
          consumer:
            header-mode: raw
      kafka:
        binder:
          zkNodes: localhost:2181
          brokers: localhost:9092

Add the following Application.java

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

    private static final Logger log = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    private void consume(Message<String> message) {
        log.info("Received: {}", message.getPayload());
        if ("message1".equals(message.getPayload())
            throw new RuntimeException();
        log.info("Successfully processed message {}", message.getPayload());
    }
}

That should be it. Run the application and use the console-producer to produce messages.

Upvotes: 3

Views: 4344

Answers (3)

Arek
Arek

Reputation: 3176

Kafka gives you a runtime, but you have the power of choice. In some scenarios msgs can be lost/skipped, in some may not - you need to prepare the configuration according to your needs. IMO you should investigate further some of Spring Cloud Stream settings. You can also play around with disabling auto commits and commiting offset "by hand".

Upvotes: 0

Vinicius Carvalho
Vinicius Carvalho

Reputation: 4156

You should configure a DLQ for such cases. If your message could not be consumed after 3 retries, it's most likely it won't be consumed at all or it needs special treatment. Set a DLQ where the poisonous message could land, and you won't loose messages

Upvotes: 0

VIJ
VIJ

Reputation: 1636

In Kafka each message comes with an offset id. Your consumer application can keep a check on offset, and if any offset is skipped or missed instead of consuming the next message. you can use consumer.seek method get the specific message which is missing.

Offsets are incremental in nature and sequential.

And in your case use manual commit.

I could say use the below steps..

  1. After poll method, first check the previously committed offset and and request the next offset value

  2. Once message is consumed and processed successfully, save the offset value of successfully processed message in some internal memory or table. During next poll

The below link will not serve your usecase, but you can get fair idea

Refer Example

Upvotes: 1

Related Questions