Nikola Stanković
Nikola Stanković

Reputation: 881

How can I add in reactive Java a new Object to an existing stream?

Assuming I've already a reactive stream and now I wanna add one more object to this existing stream. How can I do this?

This is the approach i found, is this the way to go?

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/**
 * Created by ton on 10/11/16.
 */
public class Example {

    private List<FluxSink<String>> handlers = new ArrayList<>();

    public Flux<String> getMessagesAsStream() {
        Flux<String> result = Flux.create(sink -> {
                handlers.add(sink);
            sink.setCancellation(() -> handlers.remove(sink));
        });

        return result;
    }

    public void handleMessage(String message) {
        handlers.forEach(han -> han.next(message));
    }

    public static void main(String[] args) {
        Example example = new Example();
        example.getMessagesAsStream().subscribe(req -> System.out.println("req = " + req));
        example.getMessagesAsStream().subscribe(msg -> System.out.println(msg.toUpperCase()));
        example.handleMessage("een");
        example.handleMessage("twee");
        example.handleMessage("drie");
    }
}

Upvotes: 5

Views: 14960

Answers (1)

systemfreund
systemfreund

Reputation: 551

Suppose this is your existing stream:

Flux<Integer> existingStream = Flux.just(1, 2, 3, 4);

You can concatenate two streams:

Flux<Integer> appendObjectToStream = Flux.concat(existingStream, Flux.just(5));

This will produce [1, 2, 3, 4, 5].

Alternatively you could merge both streams:

Flux<Integer> mergeObjectWithStream = Flux.merge(existingStream, Flux.just(5));

That will produce a similar stream, however the 5 element could appear anywhere in the resulting flux.

Upvotes: 9

Related Questions