EveMsc
EveMsc

Reputation: 3

How to re-consume messages in Kafka

I am very new to Kafka and I am working on a project to learn and understand Kafka. I am running Kafka on my laptop so I have 1 consumer and 1 producer and I'm working with Java (Spring Boot) to listen to those streams and consume the messages.

Let's say I have 2 different groups created, called "automatic" and "manual". For the "automatic" one, I do not want the messages to perform things right away. I want to aggregate the messages for 1 minute and when 1 minute passes, I want it to fire off some custom event.

But for the "manual" one, I want the message to consume it and fire off the event right away.

But when I send message from producer it will go to this the common topic itself and there is a property in the message which says if it is a "manual" or "automatic" type.

Here is my Kafka topic declaration in my application.properties file.

spring.cloud.stream.kafka.bindings.automatic.consumer.configuration.client.id=automatic-consumption-event
spring.cloud.stream.bindings.automatic.destination=main.event
spring.cloud.stream.bindings.automatic.binder=test-stream-app
spring.cloud.stream.bindings.automatic.group=consumer-automatic-group
spring.cloud.stream.bindings.automatic.consumer.concurrency=1


spring.cloud.stream.kafka.bindings.manual.consumer.configuration.client.id=manual-consumption-event
spring.cloud.stream.bindings.manual.destination=main.event
spring.cloud.stream.bindings.manual.binder=test-app
spring.cloud.stream.bindings.manual.group=consumer-manual-group
spring.cloud.stream.bindings.manual.consumer.concurrency=1

I have created 2 separate methods to be consumed and perform different actions like this.

private windows;

@PostConstruct()
private void init() {
    this.windows = SessionWindows.with(Duration.ofSeconds(5)).grace(Duration.Zero);
}

public void automatic(Ktream<string, CustomObjectType> eventStream) {
    eventStream.filter((x, y) -> y != null && !y.isManual(), Named.as("automatic_event"))
         .groupByKey(Grouped.with("participant_id", Serdes.String(), Serdes.Long()))
         .windowedBy(windows)
         .reduce(Long::sum, Named.as("participant_id_sum"))
         .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
         .toStream(Named.as("participant_id_stream"))
         .foreach(this::fireCustomEvent);
}

@StreamListener("manual-event")
public void manual(@Payload String payload) {
    var parsedObject = this.parseJSON(payload);
    if(!payload.isManual()) {
        return;
        }
    this.fireCustomEvent();
}

private CustomObjectType parseJSON(String json) {
    return JSONObject.parseObject(json.substring(json.indexOf("{")), CustomObjectType.class);
}

private void fireCustomEvent(){
    // Should do something.
}

I ran the producer with this command on my system.

bin/kafka-console-producer.sh --topic main.event --property "parse.key=true" --property "key.separator=:"  --bootstrap-server localhost:62341

And I ran consumer with this command:

bin/kafka-consumer.sh --topic main.event --from-beginning --bootstrap-server localhost:62341

These are the events I'm passing by the producer.

123: {"eventHeader": null, "data": {"headline": "You are winner", "id": "42", "isManual": true}}
987: {"eventHeader": null, "data": {"headline": "You will win", "id": "43", "isManual": false}}

Whenever the event is passed by producer, I can see my manual() triggering with the message. But it is doing expected thing of taking message and firing the event right away. But, it is consuming both the type of messages and the problem is that the "automatic" messages are not aggregating anymore. Because they have been taken by the consumer.

Every time I restart my spring boot application, the automatic() method triggers but it does not find any messages to be filtered because they were consumed already, as per my understanding. Can someone help me figure out where am I causing the confusion?

Upvotes: 0

Views: 1118

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191681

I'm not sure I understand the question. Spring will start both functions "automatically". But you have a typo of Ktream in the automatic() parameters

consuming both the type of messages

Right... Because both exist in the same topic. Perhaps you want to use branch/split operator in Kafka Streams to make a separate topic of all manual events, which your "manual" method reads instead?

because they were consumed already

That doesn't matter. What matters is that offsets were committed. You can reconsume a topic as many times as you want, as long as the data is retained in the topic.

To force reconsumption, you can use

  • KafkaConsumer.seek
  • kafka-consumer-groups --reset-offsets after you stop the app
  • give the app a new application.id/group.id along with consumer config auto.offset.reset=earliest

Upvotes: 1

Related Questions