Rob
Rob

Reputation: 123

How do I write a subscriber that accepts data from two related streams?

I'm new to reactive programming and Project Reactor so apologies in advance if I use the wrong words.

I have a simple procedural program that I'm converting to reactive, more as a learning activity than anything else. It's not a web app, so I'm just using Project Reactor and not Spring WebFlux. In the old app, the logic is something like this:

An event is received. Take the customer ID from that event, get the full details of that customer, then pass both the event and the customer data to something downstream.

final OrderEvent event = mapper.readValue(json, OrderEvent.class);
final OrderData data = event.getData();

// Retrieve customer
final Customer customer = service.getCustomerWithId(data.getCustomerId());

// Log a summary
LOG.info(
    "Received an event indicating that {} ordered {} products", customer.getName(), data.getOrderCount());

downstream.doSomething(data, customer);

Ignoring the last statement, this seems fairly easy to implement with Reactor. The incoming JSON can be wrapped in a Mono<String> and a series of map (and flatMap?) statements can transform it into a Mono<MyData>. I changed the service to return a Mono<Customer> but I'm struggling with how to then pass the output from the service to the downstream object. If it only needed the customer data, then I'd make it a subscriber. I'm thrown, though, because it needs both the original data and the customer. If I just chain everything together, I'll lose the data when it is transformed into a customer.

How should I approach this? I've thought about using ContextView but that seems to be for pushing data from the Subscriber upwards, not the other way around.

Edit: I tried Flux.zip. This may work in the use case described here but is it the best way? I'm concerned that in other use cases, e.g. if the input was the customer ID and the web service returned all the orders for that customer as Flux<OrderData>, then the two streams being zipped would have a different number of elements in each and I'm not sure it would work.

// Convert incoming events (a List) into a stream of order data
final Flux<OrderData> orderData = Flux.fromIterable(this.events)
        .map(event -> event.getData());

// Get customers associated with order data. Each order should have exactly 1 customer.
final Flux<Customer> customers = orderData
        .flatMap(data -> this.service.apply(data.getCustomerId()));

// Do something
Flux.zip(customers, orderData)
        .doOnNext(tuple -> this.downstream.accept(tuple.getT1(), tuple.getT2()))
        .blockLast();

I'm only blocking at the end because there's nothing left to do. Once the downstream has processed everything, my program can exit.

I'm using Reactor 3.4.17.

Upvotes: 1

Views: 104

Answers (1)

Esteban Herrera
Esteban Herrera

Reputation: 2291

Yes, the zip operator can work. Here's another approach:

Flux.fromIterable(this.events)
  .map(
     event -> this.service.apply(event.getData().getCustomerId())
                 .flatMap(customer -> this.downstream.accept(customer, event.getData()))
  );

You can get the customer inside the lambda expression that takes the event as argument. This way, you can have a reference to both arguments, the event and the customer.

In the example above, I'm assuming this.downstream.accept returns a Flux (it can be an empty Flux if you want), that's why I'm using the flatMap operator.

Upvotes: 1

Related Questions