davidbaguetta
davidbaguetta

Reputation: 532

RxJava: Thread pool for network calls

I'm trying to create a mechanism for limiting the number of concurrent network requests. The idea is that I want to have a fixed thread pool of, say, 20 threads, and use that pool to only allow a maximum of 20 outgoing HTTP requests.

What i've been trying to do is:

public class HttpClient {
  private final Scheduler scheduler;

  public HttpClient(int maxRequests) {
    this.scheduler = Schedulers.from(Executors.newFixedThreadPool(maxRequests));
  }

  public Single<...> request() {
    return this.httpRequest()
      .subscribeOn(this.scheduler);
  }

  // sends the http request and returns a response
  private Single<...> httpRequest() {
    return ...
  }
}

But this is not working. I've tried setting the maxRequests to just 1, sending 5 requests, and then setting a breakpoint on the server that is receiving the requests on purpose to keep the first request "stuck" there, in order to see if the other 4 wait for an available thread. But all 5 of them execute, and after a while, I just get a timeout exception on all 5 requests.

I tried using observeOn too, but it didn't work either.

EDIT: I also tried to implement a Semaphore logic with the following code:


public HttpClient(int maxRequests) {
  this.concurrentRequestsSemaphore = new Semaphore(maxRequests, true);
}

public Single<...> request() {
  return Completable.fromAction(concurrentRequestsSemaphore::acquireUninterruptibly)
   .andThen(this.httpRequest())
   .doFinally(concurrentRequestsSemaphore::release);
}

Where Semaphore is the native Java implementation of a semaphore. This mechanic worked as expected where if the maxRequests was 2, and I sent 5 requests, 2 would go out and the other 3 would get stuck inside fromAction waiting. But this approach came with other unexpected behaviors, such as the fact that even after the 2 requests received a response, none of the other 3 were executed because the .doFinally(concurrentRequestsSemaphore::release) never got executed. I did some tests, and it only executed after X requests had received a response. And it was completely unpredictable what X was going to be. So there could be a semaphore of 20 permits, 20 requests would go out and get back a response, and no other would get executed because the semaphore was never released by any request.

Upvotes: 0

Views: 460

Answers (1)

Alexei Kaigorodov
Alexei Kaigorodov

Reputation: 13525

You did not show the body of private Single<...> httpRequest(). I assume you call there some asynchronous method. Asynchronous methods occupy threads only to handle response, and when the request itself is moving to the server and back, no one thread is used. This explains why you see all 5 requests arrived at the server. Usually, to limit the number of activities of some kind, java.util.concurrent.Semaphores are used, but they limit activities by blocking a thread. Logically, since your program is asynchronous, you need to use asynchronous semaphore, but it is a rare beast. So you have following options:

  • do not limit the number of asynchronous http requests at all, as they do not take much resources anyway
  • launch a special thread which aquires permits from ordinary synchronous Semaphore, and then launches asynchronous http requests. The semaphore is released when a request is fully completed
  • use synchronous launch of http requests with fixed thread pool
  • use asynchronous semaphore. The only implementations I know are in my library DF4J: AsyncSemaphore is an extension of the standard Semaphore and so has both synchronous and asynchronous interface, and InpSignal used only in asynchronous programs. An example of InpSignal usage is at AsyncServerSocketChannel.java, where it is used to limit the number of opened client connections in an Echo Server implementation.

Upvotes: 1

Related Questions