Reputation: 415
I am using spring cloud stream version 2.1.0.RELEASE to send messages (in this case to Kafka) to channels dynamically defined based on the input received. The issue is that only every other message ends up in the correct channel, the other half end up in the default channel.
I used this sample as a starting point.
I am placing the channel I want to send to into a specific message header, then using a HeaderValueRouter to check that same header value to see which channel to output to.
I am configuring my application as follows:
@EnableBinding(CloudStreamConfig.DynamicSource.class)
public class CloudStreamConfig {
@Autowired
private BinderAwareChannelResolver resolver;
public static final String CHANNEL_HEADER = "channelHeader";
public static final String OUTPUT_CHANNEL = "outputChannel";
private final String defaultChannel = "defaultChannel";
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
@Bean
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter(CHANNEL_HEADER);
router.setDefaultOutputChannelName(defaultChannel);
router.setChannelResolver(resolver);
return router;
}
public interface DynamicSource {
@Output(OUTPUT_CHANNEL)
MessageChannel output();
}
}
And in my controller I take in an object as well as a parameter defining what channel to send it to, then send it to the MessageChannel. The relevant code is below:
@Autowired
@Qualifier(CloudStreamConfig.OUTPUT_CHANNEL)
public MessageChannel localChannel;
...
@GetMapping(path = "/error/{channel}")
@ResponseStatus(HttpStatus.OK)
public void error(@PathVariable String channel) {
// build my object
Message message = MessageBuilder.createMessage(myObject,
new MessageHeaders(Collections.singletonMap(CloudStreamConfig.CHANNEL_HEADER, channel)));
localChannel.send(message);
}
If I send 10 messages to /error/someChannel
I would expect to see 10 messages in someChannel
. However, I see half of the messages in someChannel
and the other half in defaultChannel
. I have put a debugging counter variable in my messages and it sends the first message to the correct channel, and then every second message to the correct channel, while the others all go to the default channel.
What would be causing this and how can I fix it? Am I misusing my DynamicSource
class? I assumed it would be tied to any autowired MessageChannel
of the same name (and it does appear to be) but I'm wondering if there's something I'm missing. Or is there an unintended interaction with the BinderAwareChannelResolver
? (I honestly don't know what this does, I only included it because the samples do)
Upvotes: 1
Views: 865
Reputation: 174719
There are two subscribers on the output channel - the channel binding (in the binder) and your router.
For DirectChannel
s, the default dispatching algorithm is round robin so you are sending messages alternately to the router and directly to the binder.
You need a different DirectChannel
@Bean
for the service activator so all messages go there, and thence to the binder after routing.
See sourceChannel
in that sample.
Upvotes: 2