zylum
zylum

Reputation: 63

Chaining Observable & Emitting / Passing Original Emit to Subscribe Call

I have a use case where I'm consuming messages, saving them and then replying success or fail. The mongo insert returns an Observable so I can chain using flatmap. The issue is the insert Observable emits the result of the insert, but I need the original Message emit from the first observable to reply on. Thus to make this work, I'm running the insert inside the subscribe of the first Observable and replying inside the second subscribe.

I was hoping to accomplish this in a more reactive manner with some sort of operator like flatmap. I searched through the list of operators and nothing comes up with what I'm looking for.

eb.consumer("persister.save.event").toObservable()
    .subscribe(msg -> {
        mongo.insertObservable("event", (JsonObject) msg.body())
            .subscribe(
                res -> msg.reply(new JsonObject().put("success", true)),
                error -> msg.fail(500, "failed to save event"));
            });

Is the above code the way it should be done or is there a better approach? The two subscribes don't feel right.

Upvotes: 3

Views: 110

Answers (1)

Egor Neliuba
Egor Neliuba

Reputation: 15054

Here's what can be done to avoid two subscribers:

eb.consumer("persister.save.event").toObservable()
    .flatMap(msg -> mongo.insertObservable("event", (JsonObject) msg.body()).map(mongoResponse -> msg))
    .subscribe(
            res -> msg.reply(new JsonObject().put("success", true)),
            error -> msg.fail(500, "failed to save event"));

The trick is to map your mongo result to wanted msg inside the flatMap.

Upvotes: 2

Related Questions