Reputation: 1210
I am testing spring cloud stream with kafka binder, but got an error
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.;
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.0.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>1.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>1.1.2.RELEASE</version>
</dependency>
</dependencies>
The toy application is to imitate a secretary who transfer requests between employee and boss.
The interface for employee:
public interface SecretaryServingEmployee {
@Output
MessageChannel inbox();
@Input
SubscribableChannel rejected();
@Input
SubscribableChannel approved();
}
The interface for boss:
public interface SecretaryServingBoss {
@Input
SubscribableChannel inbox();
@Output
MessageChannel rejected();
@Output
MessageChannel approved();
}
application.properties
server.port=8080
spring.cloud.stream.bindings.inbox.destination=inbox
spring.cloud.stream.bindings.approved.destination=approved
spring.cloud.stream.bindings.rejected.destination=rejected
Employee.java
@EnableBinding(SecretaryServingEmployee.class)
@Component
public class Employee {
private static Logger logger = LoggerFactory.getLogger(Employee.class);
private SecretaryServingEmployee adminAssistent;
@Autowired
public Employee(SecretaryServingEmployee adminAssistent) {
this.adminAssistent = adminAssistent;
}
@InboundChannelAdapter(value = "inbox")
public String messageSource() {
return "You are handsome!!"; // This is the message sent to boss
}
@ServiceActivator(inputChannel="approved")
public void checkApproved(String message) {
logger.info(":-)");
}
@ServiceActivator(inputChannel="rejected")
public void checkRejected(RejectionLetter letter) {
logger.warn(":-(");
}
}
Boss.java
@EnableBinding(SecretaryServingBoss.class)
@Component
public class Boss {
private SecretaryServingBoss adminAssistent;
@Autowired
public Boss(SecretaryServingBoss adminAssistent) {
this.adminAssistent = adminAssistent;
}
@ServiceActivator(inputChannel="inbox")
public void sign(String content) {
if (content.contains("You are handsome")) {
adminAssistent.approved().send(message("nice work"));
}
else {
adminAssistent.rejected().send(message("Don't send me shit"));
}
}
private <T> Message<T> message(T content) {
return MessageBuilder.withPayload(content).build();
}
}
This is part of the trace
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$000(KafkaMessageDrivenChannelAdapter.java:47) ~[spring-integration-kafka-2.0.1.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:197) ~[spring-integration-kafka-2.0.1.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:76) ~[spring-kafka-1.0.5.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.5.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:276) ~[spring-retry-1.1.3.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:172) ~[spring-retry-1.1.3.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.5.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:597) [spring-kafka-1.0.5.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1800(KafkaMessageListenerContainer.java:222) [spring-kafka-1.0.5.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:772) [spring-kafka-1.0.5.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_121]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_121]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_121]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
... 29 common frames omitted
Upvotes: 4
Views: 11438
Reputation: 409
I get this issue time to time for my Kafka producer. And the problem in my case was that the Kafka producer wasn't fully loaded up by Spring and my code was trying to send a msg using that producer object. A simple fix is to delay the sending of messages for initial few secs during start up or using a more spphisticated solution like countdown latch or spring events.
You want to see this sort of a line before your code starts sending msg to the kafka producer:
00:38:54.107 [Thread-4] INFO o.a.k.clients.producer.KafkaProducer - [Producer clientId=trade-data-producer-exasol-2] Instantiated an idempotent producer.
Upvotes: 1
Reputation: 53
Someone might benefit from my experience, battled with this issue all day until I saw @Ilayaperumal reply.
I inherited a groovy micro service, with a declared component scan, wondered why they did that;
For instance, by default @SpringBootApplication's component scan looks at >the classes under the same package where the @SpringBootApplication >annotated class is.
So I refactored the package without manually updating the component scan value as well and that was the genesis of my problem.
If you are experience the same problem check your package scanning.
Upvotes: 0
Reputation: 4179
It looks like your application classes aren't component scanned properly.
If you are running this as a Spring Boot application, can you make sure if you have the classes to be scanned are packaged correctly.
For instance, by default @SpringBootApplication
's component scan looks at the classes under the same package where the @SpringBootApplication
annotated class is.
Upvotes: 5