Carloso
Carloso

Reputation: 135

Springboot cloud Stream with Kafka

I'm trying to setup a project with Springboot cloud Stream with Kafka. I managed to build a simple example, where a listener gets messages from a topic and after processed it, it sends the output to another topic.

My listener and channels are configured like this:

@Component
public class FileEventListener {
    private FileEventProcessorService fileEventProcessorService;

    @Autowired
    public FileEventListener(FileEventProcessorService fileEventProcessorService) {
        this.fileEventProcessorService = fileEventProcessorService;
    }

    @StreamListener(target = FileEventStreams.INPUT)
    public void handleLine(@Payload(required = false) String jsonData) {
        this.fileEventProcessorService.process(jsonData);
    }
}

public interface FileEventStreams {
    String INPUT = "file_events";
    String OUTPUT = "raw_lines";

    @Input(INPUT)
    SubscribableChannel inboundFileEventChannel();

    @Output(OUTPUT)
    MessageChannel outboundRawLinesChannel();
}

The problem with this example is that when the service starts, it doesn't check for messages that already exist in the topic, it only process those messages that are sent after it started. I'm very new to Springboot stream and kafka, but for what I've read, this behavior may correspond to the fact that I'm using a SubscribableChannel. I tried to use a QueueChannel for example, to see how it works but I found the following exception:

Error creating bean with name ... nested exception is java.lang.IllegalStateException: No factory found for binding target type: org.springframework.integration.channel.QueueChannel among registered factories: channelFactory,messageSourceFactory

So, my questions are:

  1. If I want to process all messages that exists in the topic once the application starts (and also messages are processed by only one consumer), I'm on the right path?
  2. Even if QueueChannel is not the right choice for achieve the behavior explained in 1.) What do I have to add to my project to be able to use this type of channel?

Thanks!

Upvotes: 1

Views: 972

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

  1. Add spring.cloud.stream.bindings.file_events.group=foo

    • anonymous groups consume from the end of the topic only, bindings with a group consume from the beginning, by default.
  2. You cannot use a PollableChannel for a binding, it must be a SubscribableChannel.

Upvotes: 1

Related Questions