Reputation: 11
I have been attempting to get an inbound SubscribableChannel and outbound MessageChannel working in my spring boot application.
I have successfully setup the kafka channel and tested it successfully.
Furthermore I have create a basic spring boot application that tests adding and receiving things from the channel.
The issue I am having is when I put the equivalent code in the application it belongs in, it appears that the messages never get sent or received. By debugging it's hard to ascertain what's going on but the only thing that looks different to me is the channel-name. In the working impl the channel name is like application.channel in the non working app its localhost:8080/channel.
I was wondering if there is some spring boot configuration blocking or altering the creation of the channels into a different channel source?
Anyone had any similar issues?
application.yml
spring:
datasource:
url: jdbc:h2:mem:dpemail;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
platform: h2
username: hello
password:
driverClassName: org.h2.Driver
jpa:
properties:
hibernate:
show_sql: true
use_sql_comments: true
format_sql: true
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
email-in:
destination: email
contentType: application/json
email-out:
destination: email
contentType: application/json
public class Email {
private long timestamp;
private String message;
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
Binding Config
@EnableBinding(EmailQueues.class)
public class EmailQueueConfiguration {
}
Interface
public interface EmailQueues {
String INPUT = "email-in";
String OUTPUT = "email-out";
@Input(INPUT)
SubscribableChannel inboundEmails();
@Output(OUTPUT)
MessageChannel outboundEmails();
}
Controller
@RestController
@RequestMapping("/queue")
public class EmailQueueController {
private EmailQueues emailQueues;
@Autowired
public EmailQueueController(EmailQueues emailQueues) {
this.emailQueues = emailQueues;
}
@RequestMapping(value = "sendEmail", method = POST)
@ResponseStatus(ACCEPTED)
public void sendToQueue() {
MessageChannel messageChannel = emailQueues.outboundEmails();
Email email = new Email();
email.setMessage("hello world: " + System.currentTimeMillis());
email.setTimestamp(System.currentTimeMillis());
messageChannel.send(MessageBuilder.withPayload(email).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
}
@StreamListener(EmailQueues.INPUT)
public void handleEmail(@Payload Email email) {
System.out.println("received: " + email.getMessage());
}
}
I'm not sure if one of the inherited configuration projects using Spring-Cloud, Spring-Cloud-Sleuth might be preventing it from working, but even when I remove it still doesnt. But unlike my application that does work with the above code I never see the ConsumeConfig being configured, eg:
o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
auto.commit.interval.ms = 100
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = consumer-2
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
(This configuration is what I see in my basic Spring Boot application when running the above code and the code works writing and reading from the kafka channel)....
I assume there is some over spring boot configuration from one of the libraries I'm using creating a different type of channel I just cannot find what that configuration is.
Upvotes: 0
Views: 4176
Reputation: 11
Okay so after a lot of debugging... I discovered that something is creating a Test Support Binder (how don't know yet) so obviously this is used to not impact add messages to a real channel.
After adding
@SpringBootApplication(exclude = TestSupportBinderAutoConfiguration.class)
The kafka channel configurations have worked and messages are adding.. would be interesting to know what on earth is setting up this test support binder.. I'll find that sucker eventually.
Upvotes: 0
Reputation: 6106
What you posted contains a lot of unrelated configuration, so hard to determine if anything gets in the way. Also, when you say "..it appears that the messages never get sent or received.." are there any exceptions in the logs? Also, please state the version of Kafka you're using as well as Spring Cloud Stream. Now, I did try to reproduce it based on your code (after cleaning up a bit to only leave relevant parts) and was able to successfully send/receive.
My Kafka version is 0.11 and Spring Cloud Stream 2.0.0. Here is the relevant code:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
email-in:
destination: email
email-out:
destination: email
@SpringBootApplication
@EnableBinding(KafkaQuestionSoApplication.EmailQueues.class)
public class KafkaQuestionSoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaQuestionSoApplication.class, args);
}
@Bean
public ApplicationRunner runner(EmailQueues emailQueues) {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) throws Exception {
emailQueues.outboundEmails().send(new GenericMessage<String>("Hello"));
}
};
}
@StreamListener(EmailQueues.INPUT)
public void handleEmail(String payload) {
System.out.println("received: " + payload);
}
public interface EmailQueues {
String INPUT = "email-in";
String OUTPUT = "email-out";
@Input(INPUT)
SubscribableChannel inboundEmails();
@Output(OUTPUT)
MessageChannel outboundEmails();
}
}
Upvotes: 1