bluth
bluth

Reputation: 481

ServiceActivator 'randomly' unsubscribing from PublishSubscribeChannel

I have a method annotated with @ServiceActivator("CH1"), where "CH1" definition is:

  @Bean(name = "CH1")
  MessageChannel ch1() {
    return new PublishSubscribeChannel
  }

and other PollableChannels publishing to this channel via

@BidgeTo(value = "CH1", poller = @Poller("myPoller"))

Things seem to work fine most of the time; however, seemingly randomly the message handler unsubscribes from "CH1" and I see in the logs:

[DEBUG] (pool-2-thread-1) org.springframework.integration.dispatcher.BroadcastingDispatcher: No subscribers, default behavior is ignore

Now I know I can change the minSubscribers but I don't get why things seem to randomly unsubscribe? After this error it will go back to handling some messages fine. Does a message handler unsubscribe while handling messages or if the executor being used is full? I see no errors associated with this in the log nor and unsubscribe or update to subscriber counts to "CH1" in the logs.

Upvotes: 1

Views: 126

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121542

That does not make sense. Please, share some test-case to reproduce from the Framework perspective.

The source code on the matter looks like:

if (dispatched == 0 && this.minSubscribers == 0 && logger.isDebugEnabled()) {
    if (sequenceSize > 0) {
        logger.debug("No subscribers received message, default behavior is ignore");
    }
    else {
        logger.debug("No subscribers, default behavior is ignore");
    }
}

where we can go to the sequence == 0 only in case of:

Collection<MessageHandler> handlers = this.getHandlers();
if (this.requireSubscribers && handlers.size() == 0) {
    throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
}
int sequenceSize = handlers.size();

Only the clue that your subscribers unsubscribes somehow...

I see that you have a DEBUG for your CH1, so would you mind to share DEBUG logs for entire org.springframework.integration when you see that error.

EDIT

Also note that whenever a subscriber is added/removed (e.g. when a consuming endpoint is started/stopped), you will see this log message...

if (logger.isInfoEnabled()) {
    logger.info("Channel '" + this.getFullChannelName() + "' has " + counter + " subscriber(s).");
}

(when logging with at least INFO logging).

Upvotes: 2

Related Questions