Reputation: 1153
I'm new to rx java, so I'm probably missing something very basic. In the code sample below, what I'd like to happen is:
SampleController receives request on http-nio thread
CompositeService processing runs on a new thread A and releases the nio request thread.
CompositeService calls HelloService which makes a network call on thread B.
CompositeService calls WorldService which makes a network call on thread C.
3 and 4 run concurrently, and when the results are ready, we use the results to make a network call on thread A.
Instead what I'm seeing is that 3 and 4 execute sequentially on the http-nio thread, and only the CompositeService executes on a new thread. It appears that my subscribeOn calls in 3 and 4 are not having any effect. How do I get 3 and 4 to run concurrently?
SampleController:
@RestController
@RequestMapping("/rx-java-sample")
public class SampleController {
private static Logger log = LoggerFactory.getLogger(SampleController.class);
@Autowired
private CompositeService compositeService;
@RequestMapping(method = RequestMethod.GET,
produces = { MediaType.APPLICATION_JSON_VALUE })
public DeferredResult<String> getCompositeString()
throws ApiGatewayException, ApiValidationException {
log.info("Received getCompositeObject request");
Observable<String> compositeObject = compositeService.getCompositeString();
return toDeferredResult(compositeObject);
}
private DeferredResult<String> toDeferredResult(Observable<String> compositeObject) {
DeferredResult<String> result = new DeferredResult<String>();
compositeObject.subscribeOn(Schedulers.newThread()).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
result.setErrorResult(throwable);
}
@Override
public void onNext(String compositeString) {
log.info("Returning compositeObject: " + compositeString);
result.setResult(compositeString);
}
});
return result;
}
}
HelloService:
@Service
public class HelloService {
private Logger log = LoggerFactory.getLogger(HelloService.class);
public Observable<String> getHello() {
log.trace("calling get hello");
return Observable.just(makeNetworkCall());
}
private String makeNetworkCall() {
log.trace("making hello network call");
return "hello";
}
}
WorldService:
@Service
public class WorldService {
private Logger log = LoggerFactory.getLogger(HelloService.class);
public Observable<String> getWorld() {
log.trace("calling get world");
return Observable.just(makeNetworkCall());
}
private String makeNetworkCall() {
log.trace("making world network call");
return "world";
}
}
CompositeService:
@Service
public class CompositeService {
private Logger log = LoggerFactory.getLogger(CompositeService.class);
@Autowired
private HelloService helloService;
@Autowired
private WorldService worldService;
public Observable<String> getCompositeString() {
log.trace("Calling getCompositeObject");
Observable<String> foo = helloService.getHello().subscribeOn(Schedulers.newThread());
Observable<String> bar = worldService.getWorld().subscribeOn(Schedulers.newThread());
return Observable.zip(foo, bar, (f, b) -> makeNetworkCall(f,b));
}
private String makeNetworkCall(String hello, String world) {
log.trace("making composite network call");
return hello + " " + world;
}
}
log:
2016-06-16 07:10:13 INFO [http-nio-9050-exec-1] [SampleController.java:32] Received getCompositeObject request
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [CompositeService.java:23] Calling getCompositeObject
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [HelloService.java:15] calling get hello
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [HelloService.java:20] making hello network call
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [WorldService.java:15] calling get world
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [WorldService.java:20] making world network call
2016-06-16 07:10:13 TRACE [RxNewThreadScheduler-3] [CompositeService.java:32] making composite network call
2016-06-16 07:10:13 INFO [RxNewThreadScheduler-3] [SampleController.java:54] Returning compositeObject: hello world
Upvotes: 1
Views: 696
Reputation: 16142
You want to use Observable::defer
for these cases:
public Observable<String> getWorld() {
log.trace("calling get world");
return Observable.defer(() -> makeNetworkCall());
}
This ensures that your code gets called each time that the observable gets subsribed on.
Also, I'd suggest using Schedulers.io()
; it's a configurable thread pool that by default expands as needed.
Upvotes: 1
Reputation: 1153
It appears the problem was my use of Observable.just rather than Observable.fromCallable. Modifying the service code from above as below produced the behavior I was looking for. I am still looking for feedback on if this is the "recommended" way to do this. In particular I'm not sure if the use of toBlocking() in CompositeService is correct. I will likely use this pattern extensively in my code and I'd like to get it right.
HelloService:
@Service
public class HelloService {
private Logger log = LoggerFactory.getLogger(HelloService.class);
public Observable<String> getHello() {
log.trace("calling get hello");
return Observable.fromCallable(() -> {
return makeNetworkCall();
});
}
private String makeNetworkCall() {
log.trace("making hello network call");
return "hello";
}
}
WorldService:
@Service
public class WorldService {
private Logger log = LoggerFactory.getLogger(HelloService.class);
public Observable<String> getWorld() {
log.trace("calling get world");
return Observable.fromCallable(() -> {
return makeNetworkCall();
});
}
private String makeNetworkCall() {
log.trace("making world network call");
return "world";
}
}
CompositeService
:
@Service
public class CompositeService {
private Logger log = LoggerFactory.getLogger(CompositeService.class);
@Autowired
private HelloService helloService;
@Autowired
private WorldService worldService;
public Observable<String> getCompositeString() {
return Observable.fromCallable(() -> {
return getCompositeStringImpl().toBlocking().single();
});
}
public Observable<String> getCompositeStringImpl() {
log.trace("Calling getCompositeObject");
Observable<String> foo = helloService.getHello().subscribeOn(Schedulers.newThread());
Observable<String> bar = worldService.getWorld().subscribeOn(Schedulers.newThread());
return Observable.zip(foo, bar, (f, b) -> makeNetworkCall(f, b));
}
private String makeNetworkCall(String hello, String world) {
log.trace("making composite network call");
return hello + " " + world;
}
}
logs:
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-1] [CompositeService.java:29] Calling getCompositeObject
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-1] [HelloService.java:15] calling get hello
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-1] [WorldService.java:15] calling get world
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-2] [HelloService.java:23] making hello network call
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-3] [WorldService.java:23] making world network call
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-3] [CompositeService.java:38] making composite network call
2016-06-16 08:15:50 INFO [RxNewThreadScheduler-1] [SampleController.java:54] Returning compositeObject: hello world
Upvotes: 0