Reputation:
I'm trying to implement a reactive, in-memory repository. How should this be accomplished?
This is a blocking version of what I'm trying to do
@Repository
@AllArgsConstructor
public class InMemEventRepository implements EventRepository {
private final List<Event> events;
@Override
public void save(final Mono<Event> event) {
events.add(event.block());
// event.subscribe(events::add); <- does not do anything
}
@Override
public Flux<Event> findAll() {
return Flux.fromIterable(events);
}
}
I tried using event.subscribe(events::add);
but the event was not added to the list (perhaps I'm missing something there?)
Perhaps events
should be of type Flux<Event>
and there is some way to add the Mono<Event>
to Flux<Event>
?
Upvotes: 2
Views: 997
Reputation: 499
I suggest to use a Sink for this purpose.
public static class InMemEventRepository {
private final Scheduler serializerScheduler = Schedulers.single();
private final Sinks.Many<Event> events = Sinks.many().replay().all();
public void save(Mono<Event> event) {
event
.publishOn(serializerScheduler) // If event will be published on multiple threads you need to serialize them
.subscribe(x -> events.emitNext(x, EmitFailureHandler.FAIL_FAST));
}
public Flux<Event> findAll() {
return events.asFlux();
}
}
This is with reactor 3.4. With older versions you could have used a Processor but they are now deprecated. Sinks in general are easier to use but they do not serialize emission from multiple threads. That's why I use the Scheduler.
See also this answer for an alternative approach to serialize emission from the Sink
Upvotes: 2
Reputation: 7649
If you go for the Flux.fromIterable
, you'll only get a subscription for the previous events, but you'll loose the future ones
I did a PoC some time in the past trying to get a similar effect, you can check it in https://github.com/AlbertoSH/KeepMeUpdated
The main idea is to have a central point in which the events happens and the repository is subscribed to. Whenever you subscribe to findAll
, you'll get an infinite stream of List<Item>
. Any saved item will trigger a new event and anyone subscribed to findAll
will get it
Beware that this repo is using RxJava, so some port to reactor might be needed
Upvotes: 0