Jonathan Schoreels
Jonathan Schoreels

Reputation: 1720

Java 8 : Stream : Consuming steps to save Intermediate States of streamed elements

First of all, sorry for that abstract title. The idea is more simple with an example. Let say I have some values in a list L. I want to build parameters Request for a service, then call this service and collect all the Responses.

Currently, I'm using this kind of code structure :

private final List<String> bodies = ImmutableList.of(
        "1", "2", "3", "4"
);

@Test
public void BasicRequestResponseStreamToList() {

    final List<Request> requests = bodies.stream()
            .map(Request::new)
            .collect(Collectors.toList());

    final List<Response> responses = requests.stream()
            .map(service::send)
            .collect(Collectors.toList());

    commonAssertions(requests, responses);

}

However, I find the need of two stream not efficient considering the last request has to be built before the first one can be sent. I would like to do something like :

@Test
public void StatefulMapperStreamRequestResponseToList() {

    final List<Request> requests = new ArrayList<>();
    final List<Response> responses = bodies.stream()
            .map(Request::new)
            .map(x -> {
                requests.add(x);
                return service.send(x);
            })
            .collect(Collectors.toList());

    commonAssertions(requests, responses);

}

However, I feel guilty to use such a "Hack" to the mapping semantic. However it's the only way I've found to build 2 Correlated list with lazy loading. The first solution doesn't interest me because it has to wait to build all the request before sending them. I would love to achieve something like a wiretap in EIP. http://camel.apache.org/wire-tap.html

I would gladly have your thoughts about a more elegant manner than modifying the semantic of the map method to achieve this.

If it helps, you can find the source here : http://tinyurl.com/hojkdzu

Upvotes: 2

Views: 2609

Answers (2)

Tagir Valeev
Tagir Valeev

Reputation: 100249

Using .peek() while requires less changes in your code, is actually quite dirty solution. You need it, because you have a design flaw in your original code. You have "parallel data structures" (probably the term is not very good): the first element in the requests list corresponds to the first element in the responses list, and so on. When you have such situation, consider creating a new PoJo class instead. Something like this:

public class RequestAndResponse { // You may think up a better name
    public final Request req; // use getters if you don't like public final fields
    public final Response resp;

    public RequestAndResponse(Request req, Response resp) {
        this.req = req;
        this.resp = resp;
    }
}

Now your problem magically disappears. You can write:

List<RequestAndResponse> reqresp = bodies.stream()
        .map(Request::new)
        .map(req -> new RequestAndResponse(req, service.send(req)))
        .collect(Collectors.toList());

commonAssertions(reqresp);

You will need to change commonAssertions method after that, but I'm pretty sure it will become simpler. Also you may find that some methods in your code use request and response together, so it's quite natural to make them as methods in RequestAndResponse class.

Upvotes: 4

fge
fge

Reputation: 121760

Not sure what you really want to achieve here. But if what you want is to collect requests "as you go" then you can use peek():

final List<Request> requests = new ArrayList<>();

final List<Response> responses = bodies.stream()
    .map(Request::new)
    .peek(requests::add)
    .map(service::send)
    .collect(Collectors.toList());

commonAssertions(requests, responses);

.peek() is pretty much what you mean in the subject of your post; it takes a Consumer, can occur at any steps in the pipeline, and here the consumer just saves the "intermediate states" into a list.

BUT... the javadoc of peek() specifically mentions this:

For parallel stream pipelines, the action may be called at whatever time and in whatever thread the element is made available by the upstream operation. If the action modifies shared state, it is responsible for providing the required synchronization.

So, well, be careful, I guess...

Upvotes: 3

Related Questions