Andrew Sasha
Andrew Sasha

Reputation: 1294

Good implementation/support for java.util.concurrent.Flow.Processor<T,R>

Recently, I found a good support for Publisher by projectreactor.io:

Flux.create(fluxSink -> {
           for (int i = 0; i < 10; i++)
            fluxSink.next(i);
           fluxSink.complete();
        })
                .map(...)
                .subscribe(...);

Is there any good support for Proccessor? I mean something like or simular:

XXX process = new XXX((inputValue, output) -> {
    if(inputValue == 0)
       output.error();
    else
       output.next(inputValue);
});

publisher.subscribe(process);  
process.subscribe(...);

If not, how can I implement my own or why I cannot do this?

Update 1:

After discussion (see comments) it appeared that in my use case I need use flatMap (see answer), my question was Good implementation of processor by this I meant some functionality that if it's fails I able to take control and emit error instead. I think flatMap will give you quite enough functionality. In my case I used:

        import org.jsoup.Jsoup;

        Flux.just("url")
            .flatMap(url -> {
                try {
                    Document document = Jsoup.connect(url).get();
                    return Flux.just(document);
                } catch (IOException e) {
                    return Flux.error(e);
                }
            })
            .subscribe();

Upvotes: 3

Views: 1121

Answers (3)

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28301

From what you described of your use case, I don't expect you really require a Processor. Rather, use flatMap to trigger the asynchronous URL fetches. flatMap, like all Reactive Streams operators, will default to stopping immediately in case of error.

The only part where you might require a processor is to generate the Flux<URL> if you don't know the URLs in advance (otherwise, a Flux.fromIterable or Flux.just(...) would do just fine).

If you need to dispatch the result(s) to multiple Subscriber without re-triggering the requests, have a look at publish().connect() and/or cache().

Upvotes: 2

Naman
Naman

Reputation: 31908

You're probably looking for SubmissionPublisher which seems similar to the Flux implementation in reactor:

A Flow.Publisher that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered. Using a SubmissionPublisher allows item generators to act as compliant reactive-streams Publishers relying on drop handling and/or blocking for flow control.

Note: A custom sample Flow.Processor is shared in the link which can be further customised to handle onError and consume method implementation as required for your use case.

Upvotes: 3

Jens Schauder
Jens Schauder

Reputation: 81950

It really depends on what you want to do.

Most of the methods on Flux create such processors and just return them as a Flux making sure they subscribe in the correct way to the upstream Flux.

So if your Processor should just emit an event for each one it receives, but a different one map is your simple way to create your Processor. If it creates multiple (or no) event for each event received use flatMap and so on.

You can create even more complex ones by chaining those methods. I'd expect 99% of the use cases to be handled this way just fine.

If that is not sufficient, consider the various overloads of subscribe where you can use Consumer to handle elements of a Flux as well as state changes like error, completion and subscription. And you can combine those with, among others with Flux.create(fluxSink -> ...) to construct pretty flexible Processors.

Upvotes: 2

Related Questions