brub
brub

Reputation: 1153

rx java run observables on multiple different threads

I'm new to rx java, so I'm probably missing something very basic. In the code sample below, what I'd like to happen is:

  1. SampleController receives request on http-nio thread

  2. CompositeService processing runs on a new thread A and releases the nio request thread.

  3. CompositeService calls HelloService which makes a network call on thread B.

  4. CompositeService calls WorldService which makes a network call on thread C.

3 and 4 run concurrently, and when the results are ready, we use the results to make a network call on thread A.

Instead what I'm seeing is that 3 and 4 execute sequentially on the http-nio thread, and only the CompositeService executes on a new thread. It appears that my subscribeOn calls in 3 and 4 are not having any effect. How do I get 3 and 4 to run concurrently?

SampleController:

@RestController
@RequestMapping("/rx-java-sample")
public class SampleController {

    private static Logger log = LoggerFactory.getLogger(SampleController.class);

    @Autowired
    private CompositeService compositeService;

    @RequestMapping(method = RequestMethod.GET, 
        produces = { MediaType.APPLICATION_JSON_VALUE })
    public DeferredResult<String> getCompositeString() 
                    throws ApiGatewayException, ApiValidationException {
        log.info("Received getCompositeObject request");

        Observable<String> compositeObject = compositeService.getCompositeString();

        return toDeferredResult(compositeObject);
    }

    private DeferredResult<String> toDeferredResult(Observable<String> compositeObject) {
        DeferredResult<String> result = new DeferredResult<String>();

        compositeObject.subscribeOn(Schedulers.newThread()).subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable throwable) {
                result.setErrorResult(throwable);
            }

            @Override
            public void onNext(String compositeString) {
                log.info("Returning compositeObject: " + compositeString);
                result.setResult(compositeString);
            }
        });

        return result;
    }
}

HelloService:

@Service
public class HelloService {

    private Logger log = LoggerFactory.getLogger(HelloService.class);

    public Observable<String> getHello() {
        log.trace("calling get hello");
        return Observable.just(makeNetworkCall());
    }

    private String makeNetworkCall() {
        log.trace("making hello network call");
        return "hello";
    }
}

WorldService:

@Service
public class WorldService {

private Logger log = LoggerFactory.getLogger(HelloService.class);

    public Observable<String> getWorld() {
        log.trace("calling get world");
        return Observable.just(makeNetworkCall());
    }

    private String makeNetworkCall() {
        log.trace("making world network call");
        return "world";
    }
}

CompositeService:

@Service
public class CompositeService {

    private Logger log = LoggerFactory.getLogger(CompositeService.class);

    @Autowired
    private HelloService helloService;

    @Autowired
    private WorldService worldService;

    public Observable<String> getCompositeString() {
        log.trace("Calling getCompositeObject");

        Observable<String> foo = helloService.getHello().subscribeOn(Schedulers.newThread());
        Observable<String> bar = worldService.getWorld().subscribeOn(Schedulers.newThread());

        return Observable.zip(foo, bar, (f, b) -> makeNetworkCall(f,b));
    }

    private String makeNetworkCall(String hello, String world) {
        log.trace("making composite network call");
        return hello + " " +  world;
    }
}

log:

2016-06-16 07:10:13 INFO [http-nio-9050-exec-1] [SampleController.java:32] Received getCompositeObject request
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [CompositeService.java:23] Calling getCompositeObject
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [HelloService.java:15] calling get hello
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [HelloService.java:20] making hello network call
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [WorldService.java:15] calling get world
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [WorldService.java:20] making world network call
2016-06-16 07:10:13 TRACE [RxNewThreadScheduler-3] [CompositeService.java:32] making composite network call
2016-06-16 07:10:13 INFO [RxNewThreadScheduler-3] [SampleController.java:54] Returning compositeObject: hello world

Upvotes: 1

Views: 696

Answers (2)

Tassos Bassoukos
Tassos Bassoukos

Reputation: 16142

You want to use Observable::defer for these cases:

  public Observable<String> getWorld() {
      log.trace("calling get world");

      return Observable.defer(() -> makeNetworkCall());
  }

This ensures that your code gets called each time that the observable gets subsribed on.

Also, I'd suggest using Schedulers.io(); it's a configurable thread pool that by default expands as needed.

Upvotes: 1

brub
brub

Reputation: 1153

It appears the problem was my use of Observable.just rather than Observable.fromCallable. Modifying the service code from above as below produced the behavior I was looking for. I am still looking for feedback on if this is the "recommended" way to do this. In particular I'm not sure if the use of toBlocking() in CompositeService is correct. I will likely use this pattern extensively in my code and I'd like to get it right.

HelloService:

@Service
public class HelloService {

    private Logger log = LoggerFactory.getLogger(HelloService.class);

    public Observable<String> getHello() {
        log.trace("calling get hello");

        return Observable.fromCallable(() -> { 
            return makeNetworkCall();
        });
    }

    private String makeNetworkCall() {
        log.trace("making hello network call");
        return "hello";
    }
}

WorldService:

@Service
public class WorldService {

private Logger log = LoggerFactory.getLogger(HelloService.class);

    public Observable<String> getWorld() {
        log.trace("calling get world");

        return Observable.fromCallable(() -> { 
            return makeNetworkCall();
        });
    }

    private String makeNetworkCall() {
        log.trace("making world network call");
        return "world";
    }
}

CompositeService:

@Service
public class CompositeService {

    private Logger log = LoggerFactory.getLogger(CompositeService.class);

    @Autowired
    private HelloService helloService;

    @Autowired
    private WorldService worldService;

    public Observable<String> getCompositeString() {
        return Observable.fromCallable(() -> { 
            return getCompositeStringImpl().toBlocking().single();
        });
    }

    public Observable<String> getCompositeStringImpl() {
        log.trace("Calling getCompositeObject");

        Observable<String> foo = helloService.getHello().subscribeOn(Schedulers.newThread());
        Observable<String> bar = worldService.getWorld().subscribeOn(Schedulers.newThread());

        return Observable.zip(foo, bar, (f, b) -> makeNetworkCall(f, b));
    }

    private String makeNetworkCall(String hello, String world) {
        log.trace("making composite network call");
        return hello + " " + world;
    }
}

logs:

2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-1] [CompositeService.java:29] Calling getCompositeObject
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-1] [HelloService.java:15] calling get hello
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-1] [WorldService.java:15] calling get world
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-2] [HelloService.java:23] making hello network call
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-3] [WorldService.java:23] making world network call
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-3] [CompositeService.java:38] making composite network call
2016-06-16 08:15:50 INFO [RxNewThreadScheduler-1] [SampleController.java:54] Returning compositeObject: hello world

Upvotes: 0

Related Questions