massnerder
massnerder

Reputation: 63

Single Stream and Multiple Subscribers

I'm testing the waters with Java9 reactive streams and RxJava2. I dont really have a preference on either but am looking for some guidance on if this is possible.

  1. I'm creating a pre-configured amount of subscribers like so:

    for(int i = 0; i<MAX_SUBSCRIBERS; i++) {  
         System.out.println("Creating subscriber: " + i);  
         publisher.subscribe(new MySubscriber<>(i + "-subscriber"));   
    }
    
  2. I'm reading in a list of files from a directory for the purposes of concurrent uploads to some 3rd-party system.

    Stream<Path> paths = Files.list(Paths.get("/my/dir/with/files"));
    paths
    .filter((Files::isRegularFile))
    .forEach(pathName -> publisher.submit(pathName.toString()));
    

I'm receiving the following output:

    0-subscriber: /my/dir/with/files/test0.txt received in onNext
    0-subscriber: /my/dir/with/files/test1.txt received in onNext
    1-subscriber: /my/dir/with/files/test0.txt received in onNext
    1-subscriber: /my/dir/with/files/test1.txt received in onNext

Ideally, we should see the following behavior. Each subscriber should be performing work on a unique file.

    0-subscriber: /my/dir/with/files/test0.txt received in onNext
    1-subscriber: /my/dir/with/files/test1.txt received in onNext

Is this possible? Any tips would be awesome!

Upvotes: 3

Views: 2339

Answers (3)

Alexei Kaigorodov
Alexei Kaigorodov

Reputation: 13535

Publishers can be of two kinds: multicast and unicast. Multicast publishers feed each subscriber with the full set of messages, while unicast publishers route each message to single subscriber. SubmissionPublisher is programmed as multicast, it is said in its documentation.

You can find implementations of unicast publishers in my library DF4J. Look for implementations of the interface org.df4j.protocol.Flow.Publisher, which extends org.reactivestreams.Publisher

Upvotes: 1

Nicolai Parlog
Nicolai Parlog

Reputation: 51160

This was a comment, but it became too long. It's not a real answer, though, because I'm no reactive stream expert. It's rather some food for thought. 😉

My understanding is that each subscriber sees all published elements and that subscribers should be independent of one another (which I'd say excludes explicit coordination). If there is a material difference between the files (say one is PDF the other TXT), then subscribers might decide to only act on the types they were built for, but otherwise, each should process each element.

It looks like you're trying to spread the workload across several subscribers, which I assume run in different threads. That's definitely something existing concurrency constructs handle extremely well. Have a look at ExecutorService, for example.

That said, if you're building a larger stream pipeline, I see no argument against encapsulating the distribute-file-processing-across-threads part in a single subscriber. It might even be a publisher itself, publishing the result of processing each file once that's done.

A final caveat: Maybe RxJava has something up its sleeves for this specific use case. I'm curious to read other answers.

Upvotes: 2

akarnokd
akarnokd

Reputation: 70007

The Java 9 Flow API consist of 4 interfaces and the SubmissionPublisher class which dispatches every submitted value to all of its Subscribers. There are no JDK tools currently to support your dataflow.

In contrast, RxJava is a rich fluent library with hundreds of operators where you can perform parallel processing without duplication:

    ParallelFlowable<Path> pf = 
            Flowable.<Path, Stream<Path>>using(
                () -> Files.list(Paths.get("/my/dir/with/files")),
                files -> Flowable.fromIterable((Iterable<Path>)() -> files.iterator()),
                AutoCloseable::close
            )
            .parallel(2)
            .runOn(Schedulers.computation())
            .filter(Files::isRegularFile);

pf.subscribe(new Subscriber[] {
    new MySubscriber<>("0-subscriber"),
    new MySubscriber<>("1-subscriber"),
});

Upvotes: 2

Related Questions