ramtech
ramtech

Reputation: 797

Query on RxJava observeOn scheduler thread

I have to write into a file based on the incoming requests. As multiple requests may come simultaneously, I don't want multiple threads trying to overwrite the file content together, which may lead into losing some data.

Hence, I tried collecting all the requests' data using a instance variable of PublishSubject. I subscribed publishSubject during init and this subscription will remain throughout the life-cycle of application. Also I'm observing the same instance on a separate thread (provided by Vertx event loop) which invokes the method responsible for writing the file.

private PublishSubject<FileData> publishSubject = PublishSubject.create();

private void init() {
    publishSubject.observeOn(RxHelper.blockingScheduler(vertx)).subscribe(fileData -> writeData(fileData));
}

Later during request handling, I call onNext as below:

handleRequest() {
   //do some task
   publishSubject.onNext(fileData);
}

I understand that, when I call onNext, the data will be queued up, to be written into the file by the specific thread which was assigned by observeOn operator. However, what I'm trying to understand is

  1. whether this thread gets blocked in WAITING state for only this task? Or,
  2. will it be used for other activities also when no file writing happens? I don't want to end up with one thread from the vertx event loop wasted in waiting state for going with this approach. Also, please suggest any better approach, if available.

Thanks in advance.

Upvotes: 1

Views: 581

Answers (1)

yosriz
yosriz

Reputation: 10267

Actually RxJava will do it for you, by definition onNext() emissions will act in serial fashion:

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications. (Observable Contract)

So as long as you will run blocking calls inside the onNext() at the subscriber (and will not fork work to a different thread manually) you will be fine, and no parallel writes will be happen.

Actually, you're worries should come from the opposite direction - Backpressure.
You should choose your backpressure strategy here, as if the requests will come faster then you will process them (writing to file) you might overflow the buffer and get into troubles. (consider using Flowable and choose you're backpressure strategy according to your needs.

Regarding your questions, that depends on the Scheduler, you're using RxHelper.blockingScheduler(vertx) which seems like your custom code, so I can't tell, if the scheduler is using shared thread in work queue fashion then it will not stay idle.
Anyhow, Rx will not determine this for you, the scheduler responsibility is to assign the work to some thread according to its logic.

Upvotes: 1

Related Questions