Reputation: 689
I have a use-case where I need to produce to multiple Kafka topics/destinations determined at runtime. I tried to combine Functions with multiple input and output arguments by using a returning Flux<Message<T>>
from a functional bean of type Function
with setting the header spring.cloud.stream.sendto.destination
for each Message
as described here. I came up with the following implementation:
@Bean
public Function<Person, Flux<Message<Person>>> route() {
return person -> Flux.fromIterable(Stream.of(person.getEvents())
.map(e -> MessageBuilder.withPayload(person)
.setHeader("spring.cloud.stream.sendto.destination", e).build())
.collect(Collectors.toList()));
}
and I have also this in my config:
spring.cloud.stream.dynamic-destinations=
This is my Person
:
@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person {
private String[] events;
private String name;
}
events
contains the list of Kafka topic names.
However, it doesn't work. What am I'm missing?
Upvotes: 1
Views: 2434
Reputation: 5904
spring.cloud.stream.sendto.destination
uses BinderAwareChannelResolver
internally which is deprecated in favor of StreamBridge
. I think you can rewrite your code as below. I haven't tested it, but here is the template:
@Autowired StreamBridge streamBridge;
@Bean
public Consumer<Person> route() {
return person -> streamBridge.send(person.getName(), person);
}
Behind the scenes, Spring Cloud Stream will create a binding for Person
dynamically.
If you know your destinations in advance at deployment time, you can also set them through configuration. For e.g. spring.cloud.stream.source
as foo;bar..;...
. Then the framework creates output bindings in the form of foo-out-0
, bar-out-0
etc. Then you need to set destinations - spring.cloud.stream.bindings.foo-out-0.destination=foo
. But since your use case is strictly about dynamic destinations, you can't go with this approach, rather try using what I suggested above.
Upvotes: 5
Reputation: 689
One solution that works uses BinderAwareChannelResolver
. However, it's deprecated in favor if providing spring.cloud.stream.sendto.destination
property in 3.0.
@Autowired
private BinderAwareChannelResolver binderAwareChannelResolver;
@Bean
public Consumer<Person> route() {
return person ->
Stream.of(person.getEvents())
.forEach(e -> binderAwareChannelResolver.resolveDestination(e)
.send(MessageBuilder.withPayload(person).build()));
}
I don't like this solution because it combines the function-based programming model with the "legacy-style" programming model. If anyone has a better solution, please feel free to comment/answer.
Upvotes: 1