ccshih
ccshih

Reputation: 1210

spring cloud stream kafka: "Dispatcher has no subscribers" error

I am testing spring cloud stream with kafka binder, but got an error

Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.;

pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.4.0.RELEASE</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <version>1.1.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        <version>1.1.2.RELEASE</version>
    </dependency>
</dependencies>

The toy application is to imitate a secretary who transfer requests between employee and boss.

The interface for employee:

public interface SecretaryServingEmployee {
    @Output
    MessageChannel inbox();

    @Input
    SubscribableChannel rejected();

    @Input
    SubscribableChannel approved();

}

The interface for boss:

public interface SecretaryServingBoss {   
@Input
SubscribableChannel inbox();

@Output
MessageChannel rejected();

@Output
MessageChannel approved();      
}

application.properties

server.port=8080
spring.cloud.stream.bindings.inbox.destination=inbox
spring.cloud.stream.bindings.approved.destination=approved
spring.cloud.stream.bindings.rejected.destination=rejected

Employee.java

@EnableBinding(SecretaryServingEmployee.class)
@Component
public class Employee {
    private static Logger logger = LoggerFactory.getLogger(Employee.class);

    private SecretaryServingEmployee adminAssistent;

    @Autowired
    public Employee(SecretaryServingEmployee adminAssistent) {
        this.adminAssistent = adminAssistent;
    }

    @InboundChannelAdapter(value = "inbox")
    public String messageSource() {
        return "You are handsome!!";  // This is the message sent to boss
    }

    @ServiceActivator(inputChannel="approved")
    public void checkApproved(String message) {
        logger.info(":-)");
    }

    @ServiceActivator(inputChannel="rejected")
    public void checkRejected(RejectionLetter letter) {
        logger.warn(":-(");
    } 
}

Boss.java

@EnableBinding(SecretaryServingBoss.class)
@Component
public class Boss {
    private SecretaryServingBoss adminAssistent;

    @Autowired
    public Boss(SecretaryServingBoss adminAssistent) {
        this.adminAssistent = adminAssistent;
    }

    @ServiceActivator(inputChannel="inbox")
    public void sign(String content) {
        if (content.contains("You are handsome")) {
            adminAssistent.approved().send(message("nice work"));
        }
        else {
            adminAssistent.rejected().send(message("Don't send me shit"));
        }       
    }

    private <T> Message<T> message(T content) {
        return MessageBuilder.withPayload(content).build();
    }   

}

This is part of the trace

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$000(KafkaMessageDrivenChannelAdapter.java:47) ~[spring-integration-kafka-2.0.1.RELEASE.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:197) ~[spring-integration-kafka-2.0.1.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:76) ~[spring-kafka-1.0.5.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.5.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:276) ~[spring-retry-1.1.3.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:172) ~[spring-retry-1.1.3.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.5.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:597) [spring-kafka-1.0.5.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1800(KafkaMessageListenerContainer.java:222) [spring-kafka-1.0.5.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:772) [spring-kafka-1.0.5.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_121]
    at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_121]
    at java.lang.Thread.run(Unknown Source) [na:1.8.0_121]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.3.1.RELEASE.jar:4.3.1.RELEASE]
    ... 29 common frames omitted

Upvotes: 4

Views: 11438

Answers (3)

saurabh.in
saurabh.in

Reputation: 409

I get this issue time to time for my Kafka producer. And the problem in my case was that the Kafka producer wasn't fully loaded up by Spring and my code was trying to send a msg using that producer object. A simple fix is to delay the sending of messages for initial few secs during start up or using a more spphisticated solution like countdown latch or spring events.

You want to see this sort of a line before your code starts sending msg to the kafka producer:

00:38:54.107 [Thread-4] INFO o.a.k.clients.producer.KafkaProducer - [Producer clientId=trade-data-producer-exasol-2] Instantiated an idempotent producer.

Upvotes: 1

Kinglee
Kinglee

Reputation: 53

Someone might benefit from my experience, battled with this issue all day until I saw @Ilayaperumal reply.

I inherited a groovy micro service, with a declared component scan, wondered why they did that;

For instance, by default @SpringBootApplication's component scan looks at >the classes under the same package where the @SpringBootApplication >annotated class is.

So I refactored the package without manually updating the component scan value as well and that was the genesis of my problem.

If you are experience the same problem check your package scanning.

Upvotes: 0

Ilayaperumal Gopinathan
Ilayaperumal Gopinathan

Reputation: 4179

It looks like your application classes aren't component scanned properly.

If you are running this as a Spring Boot application, can you make sure if you have the classes to be scanned are packaged correctly.

For instance, by default @SpringBootApplication's component scan looks at the classes under the same package where the @SpringBootApplication annotated class is.

Upvotes: 5

Related Questions