Christopher Wong
Christopher Wong

Reputation: 260

What's a simple way to initiate messages in spring-cloud-stream

I just joined a project that is using Spring Cloud Stream as a wrapper around Kafka. The idea is that we'd abstract the messaging API so we're free to switch messaging platforms. I find the API baffling, especially with the latest round of deprecations that redirect me from the annotation-based to a functional model based on Spring Cloud Function. I feel like I'm missing something, because the prescribed programming model seems to make the simple act of producing a message quite a pain. Instead of something like kafkaTemplate.sendDefault("Hello"), we have prescribed monstrosities like:

    @Bean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }

How am I supposed to do simple message-driven code with this kind of API?

Upvotes: 0

Views: 617

Answers (2)

Artem Bilan
Artem Bilan

Reputation: 121507

I would say it is not correct to compare an infinite source of data to produce with a single kafkaTemplate.send(). If that is only a functionality you need in your logic, consider to use a StreamBridge: https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources.

The Supplier<Flux> bean is a special signal for the framework to start a source data producing logic. If you would do it yourself, it would not be so straightforward. I'm not talking yet that your Flux.fromStream(Stream.generate()) could be replaced with a single Flux.generate(). It is also not clear why would I use special scheduler and share...

There is also a @PollableBean way if you don't want to do a complex Flux logic: https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#_suppliers_sources

Upvotes: 1

Gary Russell
Gary Russell

Reputation: 174719

No; that is for publishing messages on a schedule.

See https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

It's simply streamBridge.send("bindingName", "something-to-send").

The binding can either be pre-configured in application.yml or created on the first send.

Upvotes: 1

Related Questions