wearebob
wearebob

Reputation: 415

Spring cloud stream: dynamic output channel strange behavior

I am using spring cloud stream version 2.1.0.RELEASE to send messages (in this case to Kafka) to channels dynamically defined based on the input received. The issue is that only every other message ends up in the correct channel, the other half end up in the default channel.

I used this sample as a starting point.

I am placing the channel I want to send to into a specific message header, then using a HeaderValueRouter to check that same header value to see which channel to output to.

I am configuring my application as follows:

@EnableBinding(CloudStreamConfig.DynamicSource.class)
public class CloudStreamConfig {

    @Autowired
    private BinderAwareChannelResolver resolver;

    public static final String CHANNEL_HEADER = "channelHeader";
    public static final String OUTPUT_CHANNEL = "outputChannel";

    private final String defaultChannel = "defaultChannel";

    @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
    @Bean
    public HeaderValueRouter router() {
        HeaderValueRouter router = new HeaderValueRouter(CHANNEL_HEADER);
        router.setDefaultOutputChannelName(defaultChannel);
        router.setChannelResolver(resolver);
        return router;
    }

    public interface DynamicSource {
        @Output(OUTPUT_CHANNEL)
        MessageChannel output();
    }

}

And in my controller I take in an object as well as a parameter defining what channel to send it to, then send it to the MessageChannel. The relevant code is below:

    @Autowired
    @Qualifier(CloudStreamConfig.OUTPUT_CHANNEL)
    public MessageChannel localChannel;

    ...

    @GetMapping(path = "/error/{channel}")
    @ResponseStatus(HttpStatus.OK)
    public void error(@PathVariable String channel) {
        // build my object
        Message message = MessageBuilder.createMessage(myObject,
                new MessageHeaders(Collections.singletonMap(CloudStreamConfig.CHANNEL_HEADER, channel)));
        localChannel.send(message);
    }

If I send 10 messages to /error/someChannel I would expect to see 10 messages in someChannel. However, I see half of the messages in someChannel and the other half in defaultChannel. I have put a debugging counter variable in my messages and it sends the first message to the correct channel, and then every second message to the correct channel, while the others all go to the default channel.

What would be causing this and how can I fix it? Am I misusing my DynamicSource class? I assumed it would be tied to any autowired MessageChannel of the same name (and it does appear to be) but I'm wondering if there's something I'm missing. Or is there an unintended interaction with the BinderAwareChannelResolver? (I honestly don't know what this does, I only included it because the samples do)

Upvotes: 1

Views: 865

Answers (1)

Gary Russell
Gary Russell

Reputation: 174719

There are two subscribers on the output channel - the channel binding (in the binder) and your router.

For DirectChannels, the default dispatching algorithm is round robin so you are sending messages alternately to the router and directly to the binder.

You need a different DirectChannel @Bean for the service activator so all messages go there, and thence to the binder after routing.

See sourceChannel in that sample.

Upvotes: 2

Related Questions