Adam Berlin
Adam Berlin

Reputation: 43

Upgrading io.projectreactor version from 2.0.x to 3.0.4 - Using Spring framework

I have an issue while trying to make the upgrade.

Currently i'm using version 2.0.x, and in particular -

reactor.bus
reactor.rx.Stream
reactor.rx.Streams
reactor.core.processor.RingBufferProcessor
reactor.fn.Consumer

I'm using maven, and i have a single dependency regarding 'projectreactor' -

<groupId>io.projectreactor.spring</groupId>
<artifactId>reactor-spring-core</artifactId>

When upgrading to version 3.0.4.RELEASE, in order to keep using all the things i used before, i need to explicitly import -

<groupId>io.projectreactor</groupId>
<artifactId>reactor-bus</artifactId>

And

<groupId>io.projectreactor</groupId>
<artifactId>reactor-stream</artifactId>

but i'm still missing

reactor.core.processor.RingBufferProcessor
reactor.fn.Consumer

and i'm not sure what to do.

Upvotes: 4

Views: 2070

Answers (2)

Aviad
Aviad

Reputation: 1549

reactor.rx.Stream -> reactor.core.publisher.Flux
reactor.rx.Streams -> reactor.core.publisher.Flux
reactor.rx.Promise -> reactor.core.publisher.Mono and reactor.core.publisher.MonoProcessor
reactor.core.processor.RingBufferProcessor -> reactor.core.publisher.TopicProcessor
reactor.fn.Consumer -> java.util.function.Consumer (Java 8)

There is no new spring module since spring 5 directly includes Reactor support with these new types.

As for reactor-bus : By design now all stream routes (Flux/Mono chains) are typed, so dynamic routing is not part of our features yet. Still there are alternative in a typed way, for instance :

ReplayProcessor<MyEvent> rp = ReplayProcessor.create();
Flux<MyEvent> interest1 = rp.filter(ev -> filterInterest1(ev));
Flux<MyEvent> interest2 = rp.filter(ev -> filterInterest2(ev));
Flux<MyEvent> interest1_2 = rp.filter(ev -> filterInterest1(ev) || filterInterest2(ev));

interest1.subscribe(doSomethingForInterest1);
interest2.subscribe(doSomethingForInterest2);
interest1_2.subscribe(doSomethingForInterest1_2);

rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react
rp.onNext(new MyEvent("interest1")); //subscriber 1 and 3 react
rp.onNext(new MyEvent("interest2")); //subscriber 2 and 3 react
rp.onNext(new MyEvent("interest4")); // buffered until interest subscriber because ReplayProcessor

//shutdown/cleanup/close
rp.onComplete();

I have found this on github which seems to fit your needs

Upvotes: 2

Yaroslav Stavnichiy
Yaroslav Stavnichiy

Reputation: 21456

reactor.fn.Consumer is replaced by Java 8 java.util.function.Consumer.

As for RingBufferProcessor you have to pick one of new processors all using ring buffer.

Dispatchers are now Schedulers that use Java's Executors under the hood.

Upvotes: 1

Related Questions