user10815402
user10815402

Reputation:

Spring cloud stream not send message to Kafka when declare producer and consumer in the same application

Currently i'm have producer and consumer configure in the same Spring Boot Application, but it very weird that the Spring cloud stream firing the message not go through the Kafka (I'm monitor the message with kafka-console-consumer), but the consumer still receive the message (using same thread as producer).

And if i remove the consumerHandler (@StreamListener) in the Application, the producer successfully send the message to the Kafka.

Is there any configuration for this? I need the Spring cloud stream send message to the Kafka by default.

Producer and Consumer Configure:

@Component
public interface NotificationProcessor {
   
    String EMAIL_NOTIFICATION = "email-notification";
    @Input(EMAIL_NOTIFICATION)
    SubscribableChannel receiveEmail();
    @Output(EMAIL_NOTIFICATION)
    MessageChannel sendEmail();
}

Here is some of my configuration:

spring:
  cloud:
    stream:
      kafka:
        binder:
          autoAddPartitions: true
          brokers: ${KAFKA_BROKERS:localhost:9092}
          auto-create-topics: true
          configuration:
            auto.offset.reset: latest
      bindings:
        email-notification:
          group: ${EMAIL_GROUP:email-group-notification}
          destination: ${EMAIL_TOPIC:email-notification}
          contentType: application/json
          producer:
            partitionCount: 9
          consumer:
            partitioned: true
            concurrency: 3
      instance-count: 1
      instance-index: 0

An API To Trigger Send Message:

@RestController
@RequestMapping("/api")
public class TestResource {
    private final Logger log = LoggerFactory.getLogger(TestResource.class);

    private final NotificationProcessor notificationProcessor;
    public TestResource(NotificationProcessor notificationProcessor) {
        this.notificationProcessor = notificationProcessor;
    }
 
    @ApiOperation(value = "Test api")
    @GetMapping(value = "/send-email2", produces = APPLICATION_JSON_VALUE)
    public ResponseEntity<Boolean> test2() {
        EmailMessage test =  EmailMessage.builder()
                .to(Arrays.asList(Receiver.builder().email("[email protected]").build())
                ).type(EContentType.JSON)
                .build();
        log.info("send email message to kafka");
        notificationProcessor.sendEmail().send(MessageBuilder.withPayload(test).build());
        return ResponseEntity.ok(Boolean.TRUE);
    }
}

And Consumer Handler:

@EnableBinding(NotificationProcessor.class)
public class NotificationProducer {

    private final Logger log = LoggerFactory.getLogger(NotificationProducer.class);

    public NotificationProducer(){}

    @StreamListener(NotificationProcessor.EMAIL_NOTIFICATION)
    public void receiveEmail(@Payload Message<EmailMessage> message)  {
        log.info("Receive email message from kafka");
        EmailMessage emailMessage = message.getPayload();
    }
}

Upvotes: 1

Views: 1789

Answers (1)

Oleg Zhurakousky
Oleg Zhurakousky

Reputation: 6126

It is not clear from the provided information where are you sending message to. . . what channel? Channels are internal and direct by default so if you are sending to the same channel to which you are subscribing you completely bypassing the message broker (i.e., Kafka). That would explain both symptoms (no broker and the same thread).

That said, the annotation-based configuration model has been deprecated. For the past several years we're fully migrated to functional programming model which is much simpler and also designed to help you to not think about internal implementation such as channels, since they are really for internal use (bridge between your code and the broker adapters).

There is also a new component that would allow you to send messages to a broker specifically designed for scenarios like the one you have - StreamBridge.

Anyway, give it a look and consider refactor your application. At the very least make sure you are sending to the channel that is bound to a broker destination and subscribe to another channel that is bound to the same destination, thus ensuring that roundtrip to the broker happens.

Last but not least, I am still puzzled as to why you need to send to the broker and then subscribe to it in the same application? Why the overhead of networking?

Upvotes: 0

Related Questions