Reputation: 1933
I have a problem with the following code:
public void foo(List<Player> players_list) {
JsonArray playersArr = new JsonArray();
rx.Observable.from(players_list) // assume players_list is a list contains 2 players
.concatMap(player -> {
return getUser(player.getUserId()) // get entity 'User' from DB
.flatMap(userObj -> {
User user = ...;
playersArr.add(new JsonObject()
.putString("uid", player.getExtUserId())
);
return rx.Observable.just(playersArr);
});
}).subscribe(playersObj -> {
}, (exception) -> {
log.error("", exception);
}, () -> {
// at this point I expect 'playersArr' to consist 2 entries!
});
}
when running this code it seems that its output is non-deterministic, meaning - most of the times I get valid results of 2 entries in JsonArray, but sometimes I get 1 entry.
I'm trying to figure out why??
EDIT:
I tried switching from .flatMap --> .concatMap and it seems to solve the problem but Im not sure it's really a good solution.
Upvotes: 1
Views: 150
Reputation: 11515
I will asume that playerArrs
is not be thread safe, if your Observable is executed in a asynchronous context, maybe playerArrs
may skip a add call.
Your Observable
have some side effect, has you update an object that exist outside the Observable
. To avoid this, you can update your code in a way that your Observable
will build the JsonArray
public void foo(List<Player> players_list) {
rx.Observable.from(players_list) // assume players_list is a list contains 2 players
.concatMap(player -> getUser(player.getUserId()) // get entity 'User' from DB
.map(userObj -> new JsonObject().putString("uid", player.getExtUserId())
.reduce(new JsonArray(), (seed, acu) -> seed.add(acu))) // build the JsonArray
.subscribe(playersObj -> { },
(exception) -> { log.error("", exception); },
() -> {
// at this point I expect 'playersArr' to consist 2 entries!
});
}
I'm not sure of my code, but it should be close to what you need, I think.
Upvotes: 2