Reputation: 1294
Recently, I found a good support for Publisher by projectreactor.io:
Flux.create(fluxSink -> {
for (int i = 0; i < 10; i++)
fluxSink.next(i);
fluxSink.complete();
})
.map(...)
.subscribe(...);
Is there any good support for Proccessor? I mean something like or simular:
XXX process = new XXX((inputValue, output) -> {
if(inputValue == 0)
output.error();
else
output.next(inputValue);
});
publisher.subscribe(process);
process.subscribe(...);
If not, how can I implement my own or why I cannot do this?
Update 1:
After discussion (see comments) it appeared that in my use case I need use flatMap
(see answer), my question was Good implementation of processor by this I meant some functionality that if it's fails I able to take control and emit error instead. I think flatMap
will give you quite enough functionality. In my case I used:
import org.jsoup.Jsoup;
Flux.just("url")
.flatMap(url -> {
try {
Document document = Jsoup.connect(url).get();
return Flux.just(document);
} catch (IOException e) {
return Flux.error(e);
}
})
.subscribe();
Upvotes: 3
Views: 1121
Reputation: 28301
From what you described of your use case, I don't expect you really require a Processor
. Rather, use flatMap
to trigger the asynchronous URL fetches. flatMap
, like all Reactive Streams operators, will default to stopping immediately in case of error.
The only part where you might require a processor is to generate the Flux<URL>
if you don't know the URLs in advance (otherwise, a Flux.fromIterable
or Flux.just(...)
would do just fine).
If you need to dispatch the result(s) to multiple Subscriber
without re-triggering the requests, have a look at publish().connect()
and/or cache()
.
Upvotes: 2
Reputation: 31908
You're probably looking for SubmissionPublisher
which seems similar to the Flux
implementation in reactor:
A
Flow.Publisher
that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered. Using aSubmissionPublisher
allows item generators to act as compliant reactive-streams Publishers relying on drop handling and/or blocking for flow control.
Note: A custom sample Flow.Processor
is shared in the link which can be further customised to handle onError
and consume
method implementation as required for your use case.
Upvotes: 3
Reputation: 81950
It really depends on what you want to do.
Most of the methods on Flux
create such processors and just return them as a Flux
making sure they subscribe in the correct way to the upstream Flux
.
So if your Processor
should just emit an event for each one it receives, but a different one map
is your simple way to create your Processor
. If it creates multiple (or no) event for each event received use flatMap
and so on.
You can create even more complex ones by chaining those methods. I'd expect 99% of the use cases to be handled this way just fine.
If that is not sufficient, consider the various overloads of subscribe
where you can use Consumer
to handle elements of a Flux
as well as state changes like error, completion and subscription. And you can combine those with, among others with Flux.create(fluxSink -> ...)
to construct pretty flexible Processors
.
Upvotes: 2