jj1978
jj1978

Reputation: 11

Spring Cloud Stream Kafka Channel Not Working in Spring Boot Application

I have been attempting to get an inbound SubscribableChannel and outbound MessageChannel working in my spring boot application.

I have successfully setup the kafka channel and tested it successfully.

Furthermore I have create a basic spring boot application that tests adding and receiving things from the channel.

The issue I am having is when I put the equivalent code in the application it belongs in, it appears that the messages never get sent or received. By debugging it's hard to ascertain what's going on but the only thing that looks different to me is the channel-name. In the working impl the channel name is like application.channel in the non working app its localhost:8080/channel.

I was wondering if there is some spring boot configuration blocking or altering the creation of the channels into a different channel source?

Anyone had any similar issues?

application.yml

spring:
  datasource:
    url: jdbc:h2:mem:dpemail;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
    platform: h2
    username: hello
    password: 
    driverClassName: org.h2.Driver    
  jpa:
    properties:
      hibernate:
        show_sql: true
        use_sql_comments: true
        format_sql: true

  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        email-in:
          destination: email
          contentType: application/json
        email-out:
          destination: email
          contentType: application/json

Email

public class Email {

    private long timestamp;

    private String message;

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

}

Binding Config

@EnableBinding(EmailQueues.class)
public class EmailQueueConfiguration {    
}

Interface

public interface EmailQueues {

    String INPUT = "email-in";
    String OUTPUT = "email-out";

    @Input(INPUT)
    SubscribableChannel inboundEmails();

    @Output(OUTPUT)
    MessageChannel outboundEmails();
}

Controller

 @RestController
    @RequestMapping("/queue")
    public class EmailQueueController {

        private EmailQueues emailQueues;


        @Autowired
        public EmailQueueController(EmailQueues emailQueues) {
            this.emailQueues = emailQueues;
        }

        @RequestMapping(value = "sendEmail", method = POST)
        @ResponseStatus(ACCEPTED)
        public void sendToQueue() {
            MessageChannel messageChannel = emailQueues.outboundEmails();
            Email email = new Email();
            email.setMessage("hello world: " + System.currentTimeMillis());
            email.setTimestamp(System.currentTimeMillis());

            messageChannel.send(MessageBuilder.withPayload(email).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());

        }

        @StreamListener(EmailQueues.INPUT)
        public void handleEmail(@Payload Email email) {
            System.out.println("received: " + email.getMessage());
        }
    }

I'm not sure if one of the inherited configuration projects using Spring-Cloud, Spring-Cloud-Sleuth might be preventing it from working, but even when I remove it still doesnt. But unlike my application that does work with the above code I never see the ConsumeConfig being configured, eg:

o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 100
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = consumer-2
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true

(This configuration is what I see in my basic Spring Boot application when running the above code and the code works writing and reading from the kafka channel)....

I assume there is some over spring boot configuration from one of the libraries I'm using creating a different type of channel I just cannot find what that configuration is.

Upvotes: 0

Views: 4176

Answers (2)

jj1978
jj1978

Reputation: 11

Okay so after a lot of debugging... I discovered that something is creating a Test Support Binder (how don't know yet) so obviously this is used to not impact add messages to a real channel.

After adding

@SpringBootApplication(exclude = TestSupportBinderAutoConfiguration.class)

The kafka channel configurations have worked and messages are adding.. would be interesting to know what on earth is setting up this test support binder.. I'll find that sucker eventually.

Upvotes: 0

Oleg Zhurakousky
Oleg Zhurakousky

Reputation: 6106

What you posted contains a lot of unrelated configuration, so hard to determine if anything gets in the way. Also, when you say "..it appears that the messages never get sent or received.." are there any exceptions in the logs? Also, please state the version of Kafka you're using as well as Spring Cloud Stream. Now, I did try to reproduce it based on your code (after cleaning up a bit to only leave relevant parts) and was able to successfully send/receive.

My Kafka version is 0.11 and Spring Cloud Stream 2.0.0. Here is the relevant code:

spring:   
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        email-in:
          destination: email
        email-out:
          destination: email

@SpringBootApplication
@EnableBinding(KafkaQuestionSoApplication.EmailQueues.class)
public class KafkaQuestionSoApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaQuestionSoApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(EmailQueues emailQueues) {
        return new ApplicationRunner() {
            @Override
            public void run(ApplicationArguments args) throws Exception {
                emailQueues.outboundEmails().send(new GenericMessage<String>("Hello"));
            }
        };
    }

    @StreamListener(EmailQueues.INPUT)
    public void handleEmail(String payload) {
        System.out.println("received: " + payload);
    }


    public interface EmailQueues {
        String INPUT = "email-in";
        String OUTPUT = "email-out";

        @Input(INPUT)
        SubscribableChannel inboundEmails();

        @Output(OUTPUT)
        MessageChannel outboundEmails();
    }
}

Upvotes: 1

Related Questions