Michael Hegner
Michael Hegner

Reputation: 5823

Spring Integration Broadcasts of publish-subscribe-channel seams not to work when wiretapping

I am getting message on a int:publish-subscribe-channel but only one of the subscribers will be called when wiretapping this channel (as it would be with a direct-channel). When I comment out the wiretap it works as expected. I am using Spring Boot 2.0.0.RC1. Could also be reproduced with Spring Boot 1.5.10.RELEASE.

The following configuration works as expected:

Start-Code:

ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args);
PublishSubscribeChannel subscribeChannel = context.getBean("subscriber.channel", PublishSubscribeChannel.class);
subscribeChannel.send(MessageBuilder.withPayload("Test-String").build());
context.close();

Configuration:

<int:publish-subscribe-channel id="subscriber.channel" />

<int:logging-channel-adapter channel="subscriber.channel" level="WARN" />
<int:logging-channel-adapter channel="subscriber.channel" level="ERROR" />

Output:

2018-02-07 16:08:51.308  WARN 11944 --- [           main] o.s.integration.handler.LoggingHandler   : Test-String
2018-02-07 16:08:51.315 ERROR 11944 --- [           main] o.s.integration.handler.LoggingHandler   : Test-String

When using channel interceptor the channel is not a Publish-Subscriber-Channel more but a direct-channel:

Start-Code:

ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args);
PublishSubscribeChannel subscribeChannel = context.getBean("subscriber.channel", PublishSubscribeChannel.class);
subscribeChannel.send(MessageBuilder.withPayload("Test-String").build());
context.close();

Configuration

<int:publish-subscribe-channel id="subscriber.channel" />

<int:logging-channel-adapter channel="subscriber.channel" level="WARN" />
<int:logging-channel-adapter channel="subscriber.channel" level="ERROR" />

<int:channel id="subscriber.channel">
    <int:interceptors>
        <int:wire-tap channel="incomeLogger"/>
    </int:interceptors>
</int:channel>

<int:logging-channel-adapter id="incomeLogger" level="INFO" />

Stacktrace:

Exception in thread "main" 2018-02-07 16:14:42.824  INFO 11928 --- [       Thread-3] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@667a738: startup date [Wed Feb 07 16:14:41 CET 2018]; root of context hierarchy
org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'subscriber.channel' is expected to be of type 'org.springframework.integration.channel.PublishSubscribeChannel' but was actually of type 'org.springframework.integration.channel.DirectChannel'
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:384)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:205)
at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1091)
at com.example.demo.DemoApplication.main(DemoApplication.java:17)

When I change my Starter Code to:

ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args);
MessageChannel subscribeChannel = context.getBean("subscriber.channel", MessageChannel.class);
subscribeChannel.send(MessageBuilder.withPayload("Test-String").build());
context.close();

Then one "Subscriber" will be called and the channel is tracked by the interceptor:

2018-02-07 16:17:11.415  INFO 10040 --- [           main] o.s.integration.handler.LoggingHandler   : Test-String
2018-02-07 16:17:11.416  WARN 10040 --- [           main] o.s.integration.handler.LoggingHandler   : Test-String

UPDATE:

What also works is following configuration

<int:publish-subscribe-channel id="subscriber.channel">
    <int:interceptors>
        <int:wire-tap channel="incomeLogger"/>
    </int:interceptors>
</int:publish-subscribe-channel>

<int:logging-channel-adapter id="incomeLogger" level="INFO" />
<int:logging-channel-adapter channel="subscriber.channel" level="WARN" />
<int:logging-channel-adapter channel="subscriber.channel" level="ERROR" />

Output:

2018-02-07 17:08:56.923  INFO 11300 --- [           main] o.s.integration.handler.LoggingHandler   : Test-String
2018-02-07 17:08:56.923  WARN 11300 --- [           main] o.s.integration.handler.LoggingHandler   : Test-String
2018-02-07 17:08:56.923 ERROR 11300 --- [           main] o.s.integration.handler.LoggingHandler   : Test-String

Is this expected or a bug? Or what am I missing? Thanks in advance.

Upvotes: 0

Views: 972

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121337

Look what you have:

<int:publish-subscribe-channel id="subscriber.channel" />

<int:channel id="subscriber.channel">
    <int:interceptors>
        <int:wire-tap channel="incomeLogger"/>
    </int:interceptors>
</int:channel>

Effectively you declare two beans, but both of them with the same id. So, only the second in definition order wins. But since it is a simple channel you definitely end up with the DirectChannel at runtime eliminating any possibility to appear of that publish-subscribe-channel result.

You should understand that XML configuration is parsed and the real object model in the end might be different.

So, your mistake that your override a target bean using the same id.

Your latest code with the <int:interceptors> for that <int:publish-subscribe-channel id="subscriber.channel"> is correct. And I'm sure that latest logs are exactly what you would like to see.

Upvotes: 1

Related Questions