hradecek
hradecek

Reputation: 2513

Terminate and get results from an event bus

Let's say I've got two verticles that are discovering special file names (for example. could be anything though) and publishing them to event bus e.g. one is reading names from a REST api and another from a filesystem:

ScanRestVerticle.java

/**
 * Reads file names through an REST API
 */
public class ScanRestVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        HttpClientRequest req = vertx.createHttpClient().request(HttpMethod.GET, "BASE_URL", "URL");

        req.toObservable()
           .flatMap(HttpClientResponse::toObservable)
           .lift(unmarshaller(Model.class))
           .subscribe(c -> vertx.eventBus().publish("address", c.specialName()));
        req.exceptionHandler(Throwable::printStackTrace);
        req.end();
    }
}

ScanFsVerticle.java

/**
 * Reads file names from a file
 */
public class ScanFsVerticle.java extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        StringObservable.byLine(vertx.fileSystem()
            .rxReadFile("myFileNames.txt")
            .map(Buffer::toString)
            .toObservable())
            .subscribe(c -> vertx.eventBus().publish("address", c), e -> System.err.println(e.getMessage()));
    }
}

Everything works great here, but, now, I have an verticle which is combining these names from an event bus and print them on STD.OUT:

PrintVerticle.java

public class PrintVerticle extends AbstractVerticle {

    @Override
    public void start() throws Exception {
        vertx.eventBus()
            .<JsonObject>consumer("address")
            .bodyStream()
            .toObservable()
            .reduce(new JsonArray(), JsonArray::add)
            .subscribe(j -> System.out.println(j.toString()));
}

The problem is that reduce here is never actually completed, because event bus is making an infinite stream as I think.

So how do I actually complete this operation and print the names published by both verticles?

Note: I'm really new in vert.x and rx and I might missing some pieces or get something wrong, so please dont' judge :)

Thanks in advance.

EDIT: I could call scan() instead of reduce() here, to get intermediate results, but how would I get scan().last()?

Upvotes: 1

Views: 514

Answers (1)

yosriz
yosriz

Reputation: 10267

You're assuming correctly, reduce() operator relies on onComplete() event to know where to stop to collect.
while scan() is accumulator that emit accumulated value for each new emission of source stream, so scan will give you intermediate results.
The issue here is, that your'e using an EventBus, Conceptually, EventBus is infinite stream, so onComplete() should never be called. technically it might be called when you close() the `EventBus, but I guess yo shouldn't and don't want to rely upon it.

  • If you do want to use an EventBus, you should somehow end your PrintVerticle stream, so you'll have onComplete() event. for instance, You can use the EventBus to publish some events that publishes each of the 2 observed vertices end. then you can zip it and end your stream (using takeUntil()) at PrintVerticle after this 2 events emitted.
    Something like this:

    Observable vertice1EndStream = vertx.eventBus()
        .<JsonObject>consumer("vertice1_end")
        .bodyStream()
        .toObservable();
    
    Observable vertice2EndStream = vertx.eventBus()
        .<JsonObject>consumer("vertice2_end")
        .bodyStream()
        .toObservable()
    
    Observable bothVerticesEndStream = Observable.zip(vertice1EndStream, vertice2EndStream, (vertice1, vertice1) -> new Object());
    
    vertx.eventBus()
        .<JsonObject>consumer("address")
        .bodyStream()
        .toObservable()
        .takeUntil(bothVerticesEndStream)
        .reduce(new JsonArray(), JsonArray::add)
        .subscribe(j -> System.out.println(j.toString()));
    
  • Another option, is to use directly the 2 streams instead of the EventBus. merge them together and then use reduce(), as those 2 streams are finite ones, you will not have a problem.

Upvotes: 1

Related Questions