Krishnaraj
Krishnaraj

Reputation: 2420

Subscribe on Observable Does Nothing

I am trying to understand how Observables are executed but can't seem to get this simple code to work.

public class RxJavaExample {
    public static void main(String[] args) {
        Observable<String> hello = Observable.fromCallable(() -> 
            getHello()).subscribeOn(Schedulers.newThread());

        hello.subscribe();

        System.out.println("End of main!");
    }

    public static String getHello() {
        System.out.println("Hello called in " + 
            Thread.currentThread().getName());
        return "Hello";
    }
}

Shouldn't hello.subscribe() execute getHello()?

Upvotes: 0

Views: 430

Answers (3)

Theresa Forster
Theresa Forster

Reputation: 1932

It may be you are getting confused between Threads and Observables,

The way I have used Observables in the past is for a timer on a Minecraft plugin, I have an event that is triggered every minute.

public class TimerHandler extends Observable implements Runnable{

    @Override
    public void run() {
        this.setChanged();
        this.notifyObservers();
    }
}

So this triggers every minute, and then to add events to the timer queue you just subscribe to the observable meaning that the subscribed calls are triggered every minute.

public class PlotTimer implements Observer {

    @Override
    public void update(Observable o, Object arg) {
        ......

to subscribe i call the following

getServer().getScheduler().scheduleAsyncRepeatingTask(this,timerHandler,1200,1200);
timerHandler.addObserver(new PayDayTimer());
timerHandler.addObserver(new ProfileTimer());
timerHandler.addObserver(new PlotTimer());

Upvotes: -1

LeffeBrune
LeffeBrune

Reputation: 3477

@sfiss is right, this works just as you would expect:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class RxJavaExample {
  public static void main(String[] args) throws InterruptedException {
    ExecutorService exec = Executors.newCachedThreadPool();
    Observable<String> hello = Observable.fromCallable(() -> getHello())
        .subscribeOn(Schedulers.from(exec));

    hello.subscribe();

    System.out.println("End of main!");

    exec.shutdown();
    exec.awaitTermination(10, TimeUnit.SECONDS);
  }

  public static String getHello() {
    System.out.println("Hello called in " + Thread.currentThread().getName());
    return "Hello";
  }
}

With the following output:

End of main!
Hello called in pool-1-thread-1

Upvotes: 0

sfiss
sfiss

Reputation: 2329

It is because your main thread finishes, before the background thread gets to getHello. Try to add a Thread.sleep(5000) in your main method before exiting.

Alternatively, wait until the onCompleted of your subscription is called.

EDIT: The reason why the program terminates is because RxJava spawns daemon threads. On the search for a good source, I also found this question, which probably answers it as well.

Upvotes: 2

Related Questions