Reputation: 221
I'm digging into Apache Kafka with Spring Cloud Stream and observed some behavior that makes me wonder if I'm doing something wrong or if it is working as intended - which I hardly doubt:
It is possible to lose messages on error!?
My setup is as simple as possible. A single Kafka broker and a topic with only 1 partition. Broker, topic, producer and consumer with default settings (auto-ack is true).
testcase 1
message1
message2
message1
, retrymessage1
, retrymessage1
, retrymessage2
, retrymessage2
, retrymessage2
, retrymessage1
, retrymessage1
, retrymessage1
, retrymessage2
, retrymessage2
, retrymessage2
, retryWorks as expected.
testcase 2
message1
message2
message1
message1
, retrymessage1
, retrymessage1
, retrymessage2
message3
message3
message1
will be skipped because the commited offset has been set to message3
. This is what troubles me. I don't want the consumer to continue with messages as long as prior messages were not successfully processed.
Has anyone experiences the same behavior and/or maybe could guide me on how to change this?
Thanks in advance!
Update: as requested, some code snippets
Create the topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
Connect a producer
kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
Create a maven project with
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.7.RELEASE</version>
<relativePath/>
</parent>
...
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>
Add the following application.yml
spring:
cloud:
stream:
bindings:
input:
destination: test-topic
contentType: text/plain
group: test-group
consumer:
header-mode: raw
kafka:
binder:
zkNodes: localhost:2181
brokers: localhost:9092
Add the following Application.java
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
private void consume(Message<String> message) {
log.info("Received: {}", message.getPayload());
if ("message1".equals(message.getPayload())
throw new RuntimeException();
log.info("Successfully processed message {}", message.getPayload());
}
}
That should be it. Run the application and use the console-producer to produce messages.
Upvotes: 3
Views: 4344
Reputation: 3176
Kafka gives you a runtime, but you have the power of choice. In some scenarios msgs can be lost/skipped, in some may not - you need to prepare the configuration according to your needs. IMO you should investigate further some of Spring Cloud Stream settings. You can also play around with disabling auto commits and commiting offset "by hand".
Upvotes: 0
Reputation: 4156
You should configure a DLQ for such cases. If your message could not be consumed after 3 retries, it's most likely it won't be consumed at all or it needs special treatment. Set a DLQ where the poisonous message could land, and you won't loose messages
Upvotes: 0
Reputation: 1636
In Kafka each message comes with an offset id. Your consumer application can keep a check on offset, and if any offset is skipped or missed instead of consuming the next message. you can use consumer.seek method get the specific message which is missing.
Offsets are incremental in nature and sequential.
And in your case use manual commit.
I could say use the below steps..
After poll method, first check the previously committed offset and and request the next offset value
Once message is consumed and processed successfully, save the offset value of successfully processed message in some internal memory or table. During next poll
The below link will not serve your usecase, but you can get fair idea
Refer Example
Upvotes: 1