Reputation: 8505
I have a spring-cloud-stream
application with kafka binding. I would like to send and receive a message from the same topic from within the same executable(jar). I have my channel definitions such as below:-
public interface ChannelDefinition {
@Input("forum")
public SubscriableChannel readMessage();
@Output("forum")
public MessageChannel postMessage();
}
I use @StreamListener
to receive messages. I get all sorts of unexpected errors. At times, i receive
Is there a working example of the above usecase?
Upvotes: 9
Views: 4608
Reputation: 4692
For me, consuming from "input" didn't work. I needed to use method name on @Streamlistener
and needed to use @EnableBinding
, like below:
@Slf4j
@RequiredArgsConstructor
@EnableBinding(value = Channels.class)
public class Consumer {
@StreamListener("readMessage")
public void retrieve(Something req) {
log.info("Received {{}}", req);
}
}
Upvotes: 0
Reputation: 2046
Along with the answer above by Marius Bogoevici, here's an example of how to listen to that Input.
@StreamListener
public void handleNewOrder(@Input("input") SubscribableChannel input) {
logger.info("Subscribing...");
input.subscribe((message) -> {
logger.info("Received new message: {}", message);
});
}
Upvotes: 1
Reputation: 2400
This is a wrong way to define bindable channels (because of the use of the forum
name for both). We should be more thorough and fail fast on it, but you're binding both the input and the output to the same channel and creating a competing consumer within your application. That also explains your other issue with alternate messages.
What you should do is:
public interface ChannelDefinition {
@Input
public MessageChannel readMessage();
@Output
public MessageChannel postMessage();
}
And then use application properties to bind your channels to the same queue:
spring.cloud.stream.bindings.readMessage.destination=forum
spring.cloud.stream.bindings.postMessage.destination=forum
Upvotes: 14