Reputation: 63
I have a use case where I'm consuming messages, saving them and then replying success or fail. The mongo insert returns an Observable so I can chain using flatmap. The issue is the insert Observable emits the result of the insert, but I need the original Message emit from the first observable to reply on. Thus to make this work, I'm running the insert inside the subscribe of the first Observable and replying inside the second subscribe.
I was hoping to accomplish this in a more reactive manner with some sort of operator like flatmap. I searched through the list of operators and nothing comes up with what I'm looking for.
eb.consumer("persister.save.event").toObservable()
.subscribe(msg -> {
mongo.insertObservable("event", (JsonObject) msg.body())
.subscribe(
res -> msg.reply(new JsonObject().put("success", true)),
error -> msg.fail(500, "failed to save event"));
});
Is the above code the way it should be done or is there a better approach? The two subscribes don't feel right.
Upvotes: 3
Views: 110
Reputation: 15054
Here's what can be done to avoid two subscribers:
eb.consumer("persister.save.event").toObservable()
.flatMap(msg -> mongo.insertObservable("event", (JsonObject) msg.body()).map(mongoResponse -> msg))
.subscribe(
res -> msg.reply(new JsonObject().put("success", true)),
error -> msg.fail(500, "failed to save event"));
The trick is to map
your mongo result to wanted msg
inside the flatMap
.
Upvotes: 2