nikolafon
nikolafon

Reputation: 11

Spring Cloud Stream doesn't use Kafka channel binder to send a message

I'm trying to achieve following thing:

Use Spring Cloud Stream 2.1.3.RELEASE with Kafka binder to send a message to an input channel and achieve publish subscribe behaviour where every consumer will be notified and be able to handle a message sent to a Kafka topic.

I understand that in Kafka if every consumer belongs to its own consumer group, will be able to read every message from a topic. In my case spring creates an anonymous unique consumer group to every instance of my spring boot application running. The spring boot application has only one stream listener configured to listen to the input channel.

Test example case:

Configured an example Spring Cloud Stream app with an input channel which is bound to a Kafka topic. Using a spring rest controller to send a message to the input channel expecting that message will be delivered to every spring boot application instance running. In both applications on startup I can see that kafka partition is assigned properly.

Problem:

However, when I send a message using output().send() then spring doesn't even send a message to the Kafka topic configured, instead, in the same thread it triggers @StreamListener method of the same application instance.

During debugging I see that spring code has two handlers of the message. The StreamListenerMessageHandler and KafkaProducerMessageHandler. Spring simply chains them and if first handler ends with success then it will not even go further. StreamListenerMessageHandler simply invokes my @StreamListener method in the same thread and message never reaches Kafka.

Question:

Is this by design, and in that case why is that? How can I achieve behaviour mentioned at the beginning of the post?

PS. If I use KafkaTemplate and @KafkaListener method then it works as I want. Message is sent to Kafka topic and both application instances receive message and handles it in Kafka listener annotated method.

Code:

The stream listener method is configured the following way:

@SpringBootApplication
@EnableBinding(Processor.class)
@EnableTransactionManagement
public class ProcessorApplication {

private Logger logger = 
LoggerFactory.getLogger(this.getClass().getName());

private PersonRepository repository;

public ProcessorApplication(PersonRepository repository) {

    this.repository = repository;
}

public static void main(String[] args) {
    SpringApplication.run(ProcessorApplication.class, args);

}

@Transactional
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public PersonEvent process(PersonEvent data) {
    logger.info("Received event={}", data);
    Person person = new Person();
    person.setName(data.getName());

    logger.info("Saving person={}", person);

    Person savedPerson = repository.save(person);

    PersonEvent event = new PersonEvent();
    event.setName(savedPerson.getName());
    event.setType("PersonSaved");
    logger.info("Sent event={}", event);
    return event;
   }
  }

Sending a message to the input channel:

@RestController()
@RequestMapping("/persons")
public class PersonController {

@Autowired
private Sink sink;


  @PostMapping("/events")
  public void createPerson(@RequestBody PersonEvent event) {

      sink.input().send(MessageBuilder.withPayload(event).build());
  }


}

Spring Cloud Stream config:

spring:
  cloud.stream:
    bindings:
        output.destination: person-event-output
        input.destination: person-event-input

Upvotes: 1

Views: 1646

Answers (1)

Gary Russell
Gary Russell

Reputation: 174799

sink.input().send

You are bypassing the binder altogether and sending it directly to the stream listener.

You need to send the message to kafka (to the person-event-input topic) and then each stream listener will receive the message from kafka.

You need to configure another output binding and send it there, not directly to the input channel.

Upvotes: 1

Related Questions