Jimm
Jimm

Reputation: 8505

how to send and receive from the same topic within spring cloud stream and kafka

I have a spring-cloud-stream application with kafka binding. I would like to send and receive a message from the same topic from within the same executable(jar). I have my channel definitions such as below:- public interface ChannelDefinition { @Input("forum") public SubscriableChannel readMessage(); @Output("forum") public MessageChannel postMessage(); }

I use @StreamListener to receive messages. I get all sorts of unexpected errors. At times, i receive

  1. No dispatcher found for unknown.message.channel for every other message
  2. If i attach a command line kafka subscriber to the above forum topic, it recieves every other message.
  3. My application receives every other message, which is exclusive set of messages from command line subscriber. I have made sure that my application subscribes under a specific group name.

Is there a working example of the above usecase?

Upvotes: 9

Views: 4608

Answers (3)

Sercan Ozdemir
Sercan Ozdemir

Reputation: 4692

For me, consuming from "input" didn't work. I needed to use method name on @Streamlistener and needed to use @EnableBinding, like below:

@Slf4j
@RequiredArgsConstructor
@EnableBinding(value = Channels.class)
public class Consumer {
    
    @StreamListener("readMessage")
    public void retrieve(Something req) {
        log.info("Received {{}}", req);
    }

}

Upvotes: 0

Tony Zampogna
Tony Zampogna

Reputation: 2046

Along with the answer above by Marius Bogoevici, here's an example of how to listen to that Input.

@StreamListener
public void handleNewOrder(@Input("input") SubscribableChannel input) {
    logger.info("Subscribing...");
    input.subscribe((message) -> {
        logger.info("Received new message: {}", message);
    });
}

Upvotes: 1

Marius Bogoevici
Marius Bogoevici

Reputation: 2400

This is a wrong way to define bindable channels (because of the use of the forum name for both). We should be more thorough and fail fast on it, but you're binding both the input and the output to the same channel and creating a competing consumer within your application. That also explains your other issue with alternate messages.

What you should do is:

public interface ChannelDefinition { 

   @Input
   public MessageChannel readMessage();

   @Output
   public MessageChannel postMessage();
}

And then use application properties to bind your channels to the same queue:

spring.cloud.stream.bindings.readMessage.destination=forum
spring.cloud.stream.bindings.postMessage.destination=forum

Upvotes: 14

Related Questions