user14900768
user14900768

Reputation:

How do you build a reactive in-memory repository with Spring WebFlux?

I'm trying to implement a reactive, in-memory repository. How should this be accomplished?

This is a blocking version of what I'm trying to do

@Repository
@AllArgsConstructor
public class InMemEventRepository implements EventRepository {

    private final List<Event> events;

    @Override
    public void save(final Mono<Event> event) {
        events.add(event.block());
        // event.subscribe(events::add); <- does not do anything
    }

    @Override
    public Flux<Event> findAll() {
        return Flux.fromIterable(events);
    }

}

I tried using event.subscribe(events::add); but the event was not added to the list (perhaps I'm missing something there?)

Perhaps events should be of type Flux<Event> and there is some way to add the Mono<Event> to Flux<Event>?

Upvotes: 2

Views: 997

Answers (2)

Neil Swingler
Neil Swingler

Reputation: 499

I suggest to use a Sink for this purpose.

  public static class InMemEventRepository {
    private final Scheduler serializerScheduler = Schedulers.single();

    private final Sinks.Many<Event> events = Sinks.many().replay().all();

    public void save(Mono<Event> event) {
      event
          .publishOn(serializerScheduler) // If event will be published on multiple threads you need to serialize them
          .subscribe(x -> events.emitNext(x, EmitFailureHandler.FAIL_FAST)); 
    }

    public Flux<Event> findAll() {
      return events.asFlux();
    }
  }

This is with reactor 3.4. With older versions you could have used a Processor but they are now deprecated. Sinks in general are easier to use but they do not serialize emission from multiple threads. That's why I use the Scheduler.

See also this answer for an alternative approach to serialize emission from the Sink

Upvotes: 2

Alberto S.
Alberto S.

Reputation: 7649

If you go for the Flux.fromIterable, you'll only get a subscription for the previous events, but you'll loose the future ones

I did a PoC some time in the past trying to get a similar effect, you can check it in https://github.com/AlbertoSH/KeepMeUpdated

The main idea is to have a central point in which the events happens and the repository is subscribed to. Whenever you subscribe to findAll, you'll get an infinite stream of List<Item>. Any saved item will trigger a new event and anyone subscribed to findAll will get it

Beware that this repo is using RxJava, so some port to reactor might be needed

Upvotes: 0

Related Questions