Reputation: 80166
We can execute some code asynchronously in C# Rx, as shown below, using Observable.Start()
. I am wondering what is the equivalent in RxJava.
void Main()
{
AddTwoNumbersAsync (5,4)
.Subscribe(x=>Console.WriteLine(x));
}
IObservable<int> AddTwoNumbersAsync(int a, int b)
{
return Observable.Start(() => AddTwoNumbers(a, b));
}
int AddTwoNumbers(int a, int b)
{
return a + b;
}
Upvotes: 2
Views: 917
Reputation: 11
I would use the Flowable Object from rxJava.
public static void main(String[] args) throws Exception {
Flowable.fromCallable(() -> addTwoNumbersAsync(5, 4))
.subscribe(result -> System.out.println(result));
Thread.sleep(1000);
}
private static int addTwoNumbersAsync(int a, int b) {
return a + b;
}
The method call and the System print will be in a rxJava Thread, and not in the main Thread. You can specify the threadpool on which the Flowable will operate, by adding .subscribeOn(Schedulers.computation())
before the .subscribe(...)
for example.
You can also make a method, which returns the Flowable, which is closer to your original example.
public static void main(String[] args) throws Exception {
addTwoNumbersAsync(5,4)
.subscribe(result -> System.out.println(result));
Thread.sleep(1000);
}
private static Flowable<Integer> addTwoNumbersAsync(int a, int b) {
return Flowable.fromCallable(() -> a+b);
}
Upvotes: 1
Reputation: 16394
You can defer the operation until subscription, and ensure that subscription happens on another thread:
Observable<Integer> sumDeferred = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.just(addTwoNumbers(5, 4));
}
}).subscribeOn(Schedulers.io());
sumDeferred.subscribe(...);
Upvotes: 5