minizibi
minizibi

Reputation: 671

Dispatcher has no subscribers for channel - spring-cloud-stream-kafka

after upgrading to Spring Boot 2, Reactor 3.5, kafka-binder 2.0.0 RELEASE, and kafka-client 1.0.1 one of modules doesn't works. I spent 5 days to make it and read probably all related topics but cannot find reason of this behavior.

Main class:

@Slf4j
@EnableI18N
@EnableSideBar
@ComponentScan
@SpringBootConfiguration
@EnableConfigurationProperties
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class View
{
    public static void main(String[] args)
    {
        new SpringApplicationBuilder(View.class)
            .profiles("production")
            .bannerMode(Banner.Mode.OFF)
            .headless(true)
            .application()
            .run(args);

        log.info("\nhttp://localhost:8083/\n");
    }
}

Configuration marker:

@Configuration
@Profile("production")
@EnableBinding(OffersChannel.class)
class ProductionOffersConfiguration
{
}

Channel interface:

public interface OffersChannel
{
    String OFFERS_OBTAIN = "offersObtain";
    String OFFERS_OBTAIN_REQUEST= "offersObtainRequest";

    @Input(OFFERS_OBTAIN)
    SubscribableChannel offersChannel();

    @Output(OFFERS_OBTAIN_REQUEST)
    MessageChannel offersRequestChannel();
}

And AdminUI class, what is important before updating dependencies, when this class was initialized I was able to see a log in console which says that I subscribed to channel, now nothing happens:

@Slf4j
@Push
@Theme("${view.default-theme}")
@SpringUI(path = WebsiteMapping.ADMIN)
@RequiredArgsConstructor
public class AdminUI extends UI
{
    MessageChannel offersObtainRequest;
    Grid<Ad> adGrid = createAdGrid();
    private List<Ad> ads = new LinkedList<>();
    ConnectionService connectionService;

    @Override
    @SneakyThrows
    protected void init(VaadinRequest vaadinRequest)
    {
        setContent(splitPane());
        adGrid.setItems(ads);
    }

    private VerticalSplitPanel splitPane()
    {
        VerticalSplitPanel verticalSplitPanel = new VerticalSplitPanel();
        verticalSplitPanel.setSplitPosition(10, Unit.PERCENTAGE);
        verticalSplitPanel.setFirstComponent(buttonsLayout());
        verticalSplitPanel.setSecondComponent(adGrid);

        return verticalSplitPanel;
    }

    private Layout buttonsLayout()
    {
        HorizontalLayout layout = new HorizontalLayout();
        layout.setMargin(true);

        layout.addComponent(requestMoreOffersButton());

        ThemeSelectorComboBox themeSelectorComboBox = new ThemeSelectorComboBox();
        layout.addComponent(themeSelectorComboBox);
        layout.setComponentAlignment(themeSelectorComboBox, Alignment.MIDDLE_RIGHT);

        return layout;
    }

    @PostConstruct
    private void createGridProperties()
    {
        adGrid.setSizeFull();
        adGrid.addColumn(Ad::getTitle).setCaption("Title");
        adGrid.addColumn(Ad::getLocation).setCaption("Location");
        adGrid.addColumn(Ad::getHref).setCaption("Href");
    }

    @StreamListener
    public void fetchAdsFrom(@Input(OffersChannel.OFFERS_OBTAIN) Flux<Ad> fluxAd)
    {
        fluxAd.subscribe(this::displayOfferInGrid);
    }

    private void displayOfferInGrid(Ad ad)
    {
        ads.add(ad);
        adGrid.setItems(ads);
    }

    private Button requestMoreOffersButton()
    {
        return new Button("Request 10 more offers", this::requestMoreOffers);
    }

    private Button startServiceButton(String caption, String url, String message)
    {
        return new Button(caption, buttonClickedEvent -> startService(url, message));
    }

    private void startService(String url, String message)
    {
        ConnectionRequest startProviderRequest = ConnectionRequest
            .builder()
            .url(url)
            .build();

        connectionService
            .getForHtml(startProviderRequest)
            .thenAccept(serviceStarted -> Notification.show(message));
    }

    private void requestMoreOffers(Event event)

    {
        offersObtainRequest.send(new GenericMessage(new AdBroadcastRequest(ads.size(),10)));
    }

    private Grid<Ad> createAdGrid()
    {
        return new Grid<>();
    }
}

Application.yml:

spring:
    cloud:
        stream:
            bindings:
                offersObtainRequest:
                   destination: adsBroadcastRequestes
                   binder: kafka
                   group: adsBroadcastRequestsProducer
                offersObtain:
                   destination: adsBroadcast
                   binder: kafka
                   group: adsConsumer

stacktrace:

2018-04-07 11:27:12.010 DEBUG o.s.retry.support.RetryTemplate         : Retry: count=0
2018-04-07 11:27:12.010 DEBUG o.s.integration.channel.DirectChannel   : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
2018-04-07 11:27:12.010 DEBUG o.s.r.backoff.ExponentialBackOffPolicy  : Sleeping for 1000
2018-04-07 11:27:13.011 DEBUG o.s.retry.support.RetryTemplate         : Checking for rethrow: count=1
2018-04-07 11:27:13.011 DEBUG o.s.retry.support.RetryTemplate         : Retry: count=1
2018-04-07 11:27:13.011 DEBUG o.s.integration.channel.DirectChannel   : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=2, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
2018-04-07 11:27:13.011 DEBUG o.s.r.backoff.ExponentialBackOffPolicy  : Sleeping for 2000
2018-04-07 11:27:13.909 DEBUG o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=adsConsumer] Sending Heartbeat request to coordinator Kacper-PC:9092 (id: 2147483647 rack: null)
2018-04-07 11:27:14.138 DEBUG o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=adsConsumer] Received successful Heartbeat response
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate         : Checking for rethrow: count=2
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate         : Retry: count=2
2018-04-07 11:27:15.011 DEBUG o.s.integration.channel.DirectChannel   : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate         : Checking for rethrow: count=3
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate         : Retry failed last attempt: count=3
2018-04-07 11:27:15.012 DEBUG o.s.i.h.a.ErrorMessageSendingRecoverer  : Sending ErrorMessage: failedMessage: GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.offersObtain'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) [spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211) [spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    ... 23 common frames omitted

Upvotes: 3

Views: 13954

Answers (4)

RAM237
RAM237

Reputation: 1035

I've spent hours trying to identify the real issue behind those "Dispatcher has no subscribers" error.

In my case the problem was Kafka storing some historical node id (1001) while all topics were using 1003 as a leader id, and that's why messages were unable to be sent. Had to wipe all this manually to make it work.

While it perfectly worked for my local setup as a quick fix, it is unlikely to be the proper way on a production environment. For permanent solution (setting static id for your broker), you can try to check out this answer.

Upvotes: 0

anjey
anjey

Reputation: 93

You can use the @StreamListener annotation but with a target parameter inside the annotation.

@StreamListener(target = OffersChannel.OFFERS_OBTAIN)
public void fetchAdsFrom(@Payload Flux<Ad> fluxAd, @Header final String someHeader)
{
    fluxAd.subscribe(this::displayOfferInGrid);
}

Did work for me, similar setup to yours (instead of Flux i receive a JSON - String).

It could be that you have to write your own converter for the Flux Message, i am not sure.

Upvotes: 0

panser
panser

Reputation: 2139

in my case I declared spring-cloud-steam interface with @Input channel

 @Input("in")
 SubscribableChannel inbound();

but I didn't use any @StreamListener to handle this target, so as result I got such error

MessageDeliveryException: Dispatcher has no subscribers for channel

Solution: I removed @Input declaration, and leave just @Output("out") one.

FYI: my input configured in other separate service.

Upvotes: 3

Oleg Zhurakousky
Oleg Zhurakousky

Reputation: 6126

Well, there is a lot going on in there. I mean all the non-Spring-Cloud-Stream stuff. . . makes it hard to follow. In any event, your @StreamListener is not defined inside of any Spring managed configuration classes, so it doesn't get picked up. You can move it to ProductionOffersConfiguration or View or any other Spring managed configuration classes.

Also, consider going through this quick tutorial https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_quick_start (5 min) to better understand the mechanics of spring-cloud-stream

Upvotes: 4

Related Questions