cannot_mutably_borrow
cannot_mutably_borrow

Reputation: 689

Spring Cloud Stream - route to multiple dynamic destinations at runtime

I have a use-case where I need to produce to multiple Kafka topics/destinations determined at runtime. I tried to combine Functions with multiple input and output arguments by using a returning Flux<Message<T>> from a functional bean of type Function with setting the header spring.cloud.stream.sendto.destination for each Message as described here. I came up with the following implementation:

@Bean
public Function<Person, Flux<Message<Person>>> route() {
    return person -> Flux.fromIterable(Stream.of(person.getEvents())
            .map(e -> MessageBuilder.withPayload(person)
                    .setHeader("spring.cloud.stream.sendto.destination", e).build())
            .collect(Collectors.toList()));
}

and I have also this in my config:

spring.cloud.stream.dynamic-destinations=

This is my Person:

@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person {
    private String[] events;
    private String name;
}

events contains the list of Kafka topic names.

However, it doesn't work. What am I'm missing?

Upvotes: 1

Views: 2434

Answers (2)

sobychacko
sobychacko

Reputation: 5904

spring.cloud.stream.sendto.destination uses BinderAwareChannelResolver internally which is deprecated in favor of StreamBridge. I think you can rewrite your code as below. I haven't tested it, but here is the template:

@Autowired StreamBridge streamBridge;

@Bean
public Consumer<Person> route() {
    return person -> streamBridge.send(person.getName(), person);
}

Behind the scenes, Spring Cloud Stream will create a binding for Person dynamically.

If you know your destinations in advance at deployment time, you can also set them through configuration. For e.g. spring.cloud.stream.source as foo;bar..;.... Then the framework creates output bindings in the form of foo-out-0, bar-out-0 etc. Then you need to set destinations - spring.cloud.stream.bindings.foo-out-0.destination=foo. But since your use case is strictly about dynamic destinations, you can't go with this approach, rather try using what I suggested above.

Upvotes: 5

cannot_mutably_borrow
cannot_mutably_borrow

Reputation: 689

One solution that works uses BinderAwareChannelResolver. However, it's deprecated in favor if providing spring.cloud.stream.sendto.destination property in 3.0.

@Autowired
private BinderAwareChannelResolver binderAwareChannelResolver;

@Bean
public Consumer<Person> route() {
return person ->
        Stream.of(person.getEvents())
                .forEach(e -> binderAwareChannelResolver.resolveDestination(e)
                        .send(MessageBuilder.withPayload(person).build()));
}

I don't like this solution because it combines the function-based programming model with the "legacy-style" programming model. If anyone has a better solution, please feel free to comment/answer.

Upvotes: 1

Related Questions