Vivek Mangal
Vivek Mangal

Reputation: 652

Creating threads in Rxjava

I am passing two arguments in just() operator. Snippet of my code is :

Observable<Integer> observable = Observable.just(1,2);
observable.subscribeOn(Schedulers.newThread())
                        .subscribe(
                                new Observer<Integer>() {
                                    @Override
                                    public void onSubscribe(Disposable d) {

                                    }

                                    @Override
                                    public void onError(Throwable e) {

                                    }

                                    @Override
                                    public void onComplete() {

                                    }

                                    @Override
                                    public void onNext(Integer e) {
                                        System.out.println(e);
                                        //request web service

                                });

What I observed is it is not making separate thread for each emitted item. Items appearing as just arguments are running sequentially. How to create separate thread for each emitted item?

Upvotes: 2

Views: 4292

Answers (1)

paul
paul

Reputation: 13471

You can use flatMap and inside the flatMap create the new observable and use the subscribeOn

@Test
public void test() {
    Observable.just(1, 2)
            .flatMap(item -> Observable.just(item)
                    .subscribeOn(Schedulers.newThread())
                    .doOnNext(i -> System.out.println("Thread:" + Thread.currentThread())))
            .subscribe(System.out::println);
}

You can see more examples about async observable here https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

Upvotes: 3

Related Questions