Aravind Yarram
Aravind Yarram

Reputation: 80166

Equivalent in RxJava

We can execute some code asynchronously in C# Rx, as shown below, using Observable.Start(). I am wondering what is the equivalent in RxJava.

void Main()
{
      AddTwoNumbersAsync (5,4)
      .Subscribe(x=>Console.WriteLine(x));

}
IObservable<int> AddTwoNumbersAsync(int a, int b)
{
      return Observable.Start(() => AddTwoNumbers(a, b));
}
int AddTwoNumbers(int a, int b)
{
  return a + b;
}

Upvotes: 2

Views: 917

Answers (2)

Florian G.
Florian G.

Reputation: 11

I would use the Flowable Object from rxJava.

public static void main(String[] args) throws Exception {
    Flowable.fromCallable(() -> addTwoNumbersAsync(5, 4))
            .subscribe(result -> System.out.println(result));
    Thread.sleep(1000);
}
    
private static int addTwoNumbersAsync(int a, int b) {
    return a + b;
}

The method call and the System print will be in a rxJava Thread, and not in the main Thread. You can specify the threadpool on which the Flowable will operate, by adding .subscribeOn(Schedulers.computation()) before the .subscribe(...)for example.

You can also make a method, which returns the Flowable, which is closer to your original example.

public static void main(String[] args) throws Exception {
    addTwoNumbersAsync(5,4)
            .subscribe(result -> System.out.println(result));
    Thread.sleep(1000);
}
    
private static Flowable<Integer> addTwoNumbersAsync(int a, int b) {
    return Flowable.fromCallable(() -> a+b);
}

Upvotes: 1

Adam S
Adam S

Reputation: 16394

You can defer the operation until subscription, and ensure that subscription happens on another thread:

Observable<Integer> sumDeferred = Observable.defer(new Func0<Observable<Integer>>() {
        @Override
        public Observable<Integer> call() {
            return Observable.just(addTwoNumbers(5, 4));
        }
    }).subscribeOn(Schedulers.io());
sumDeferred.subscribe(...);

Upvotes: 5

Related Questions