Reputation: 135
I'm trying to setup a project with Springboot cloud Stream with Kafka. I managed to build a simple example, where a listener gets messages from a topic and after processed it, it sends the output to another topic.
My listener and channels are configured like this:
@Component
public class FileEventListener {
private FileEventProcessorService fileEventProcessorService;
@Autowired
public FileEventListener(FileEventProcessorService fileEventProcessorService) {
this.fileEventProcessorService = fileEventProcessorService;
}
@StreamListener(target = FileEventStreams.INPUT)
public void handleLine(@Payload(required = false) String jsonData) {
this.fileEventProcessorService.process(jsonData);
}
}
public interface FileEventStreams {
String INPUT = "file_events";
String OUTPUT = "raw_lines";
@Input(INPUT)
SubscribableChannel inboundFileEventChannel();
@Output(OUTPUT)
MessageChannel outboundRawLinesChannel();
}
The problem with this example is that when the service starts, it doesn't check for messages that already exist in the topic, it only process those messages that are sent after it started. I'm very new to Springboot stream and kafka, but for what I've read, this behavior may correspond to the fact that I'm using a SubscribableChannel
. I tried to use a QueueChannel
for example, to see how it works but I found the following exception:
Error creating bean with name ... nested exception is java.lang.IllegalStateException: No factory found for binding target type: org.springframework.integration.channel.QueueChannel among registered factories: channelFactory,messageSourceFactory
So, my questions are:
QueueChannel
is not the right choice for achieve the behavior explained in 1.) What do I have to add to my project to be able to use this type of channel?Thanks!
Upvotes: 1
Views: 972
Reputation: 174554
Add spring.cloud.stream.bindings.file_events.group=foo
You cannot use a PollableChannel
for a binding, it must be a SubscribableChannel
.
Upvotes: 1