Shvalb
Shvalb

Reputation: 1933

How to process items emitted by an Observable with access to values from another?

I need to perform an async call_1, catch its Observable reply_1, then make another async call_2 and when processing its reply_2 I also need access to the reply_1.

I've tried something like:

public rx.Observable<Game> findGame(long templateId, GameModelType game_model, GameStateType state) {

rx.Observable<RxMessage<byte[]>> ebs = context.getGameTemplate(templateId);

return context.findGame(templateId, state) // findGame returns rx.Observable<RxMessage<byte[]>>

    .flatMap(new Func1<RxMessage<byte[]>, rx.Observable<Game>>() {

        @Override
        public Observable<Game> call(RxMessage<byte[]> gameRawReply) {

            Game game = null;

            switch(game_model) {

                case SINGLE: {

                    ebs.subscribe(new Action1<RxMessage<byte[]>>() {

                        @Override
                        public void call(RxMessage<byte[]> t1) {

                            game = singleGames.get(0);

                        }
                    });
                }
            }

            return rx.Observable.from(game);
        }
    });
}

I'm still having problem compiling this method because of final issues of game.

Is this the right way to do work on this problem or there is a much natural way to accomplish what I'm trying to.

Upvotes: 1

Views: 338

Answers (1)

david.mihola
david.mihola

Reputation: 12992

If I understand what you want to do correctly, I think the natural way to solve this would be zip:

You have two Observables that asynchronously emit their results, namely ebs and the return value of context.findGame(...).

You can combine their result by doing something like this:

public rx.Observable<Game> findGame(long templateId, GameModelType game_model, GameStateType state) {

    rx.Observable<RxMessage<byte[]>> ebs = context.getGameTemplate(templateId);
    rx.Observable<RxMessage<byte[]>> gameObs = context.findGame(templateId, state);

    return Observable.zip(gameObs, ebs, new Func2<RxMessage<byte[]>, RxMessage<byte[]>, Game>() {

        @Override
        public Game call(RxMessage<byte[]> gameRawReply, RxMessage<byte[]> t1) {

            Game game = null;

            switch(game_model) {
                case SINGLE: {
                    game = singleGames.get(0);
                }
            }

            return game;
        }
    });
}

The Func2 - the third argument of zip - will be called when both of your source Observables have called their onNext. It will be used to combine the values they emit to a new value of type Game and this will be emitted to subscribers of the resulting Observable.

EDIT: Some more information...

Note that I changed the return of call() from Observable<Game> to just Game. Otherwise the result of zip would not have been an Observable<Game> but an Observable<Observable<Game>>. Unlike map and flatMap there is only zip in rx - no flatZip. But since you always want to emit exactly one game for each pair of input items (one from ebs, one from gameObs) that's not a problem in this case.

Also, the call() of the Func2 could now be further simplified to just:

@Override
public Game call(RxMessage<byte[]> gameRawReply, RxMessage<byte[]> t1) {

    switch(game_model) {
        case SINGLE: {
            return singleGames.get(0);
        }
    }
}

Upvotes: 1

Related Questions