skovmand
skovmand

Reputation: 4422

onNext not called in Retrofit and RxJava, when running in thread

I am trying to make requests to the Spotify Web API from Retrofit 2.1 while using RxJava. I would like each request to happen in its own thread and the results to be printed when they are ready, in any order.

The current code executes on the main thread and works. However when I insert .subscribeOn(Schedulers.newThread()) in the Rx chain, I get an empty output. It seems that the onNext() is never called.

I see this is often fixed by inserting .observeOn(AndroidSchedulers.mainThread()) in Android, but how can I do this in regular Java 8 (without RxAndroid)?

public interface SpotifyService {

    @GET("tracks/{trackId}")
    Observable<SpotifyTrack> getTrack(@Path("trackId") String id);

    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl("https://api.spotify.com/v1/")
            .addConverterFactory(JacksonConverterFactory.create())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .build();
}


public class Main {

    private static SpotifyService spotifyService = SpotifyService.retrofit.create(SpotifyService.class);

    public static void main(String[] args) {
        String[] trackIds = {
            "spotify:track:2HUI2s84pkL5815G8WI1Lg",
            "spotify:track:1bZrI1KgVKr8Qfja9cnmGh",
            "spotify:track:1WP1r7fuvRqZRnUaTi2I1Q",
            "spotify:track:5kqIPrATaCc2LqxVWzQGbk",
            "spotify:track:0mWiuXuLAJ3Brin3Or2x6v",
            "spotify:track:0BF6mdNROWgYo3O3mNGrBc"
        };

        printTrackNames(trackIds);
    }

    private static void printTrackNames(String[] trackIds) {
        Observable.from(trackIds)
            .map(Main::toTrackId)
            .flatMap(spotifyService::getTrack)
            .map(SpotifyTrack::getName)
            .subscribe(System.out::println);
    }

    private static String toTrackId(String track) {
        if (track.contains("spotify:track:")) {
            return track.split(":")[2];
        }

        return track;
    }
}

Upvotes: 1

Views: 1920

Answers (2)

paul
paul

Reputation: 13471

It´s not about that is not going to onNext, but since it´s in another thread you cannot debug it.

To provee it, use TestSubscriber to wait for the finish of the observer.

Here a Unit Test to provee it.

@Test
public void testObservableAsync() throws InterruptedException {
    Subscription subscription = Observable.from(numbers)
                                          .subscribeOn(Schedulers.newThread())
                                          .subscribe(number -> System.out.println("Items emitted:" + total));
    System.out.println("I finish before the observable finish.  Items emitted:" + total);
    new TestSubscriber((Observer) subscription)
            .awaitTerminalEvent(100, TimeUnit.MILLISECONDS);
}

If want to see more asyn test take a look here https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

Upvotes: 1

Ross Hambrick
Ross Hambrick

Reputation: 5940

Since you don't have a run loop that will continue executing like you have in Android, you will need to block the main thread so it doesn't exit main() and die before the subscription finishes. You can so this by using toBlocking()

Upvotes: 1

Related Questions