ZhekaKozlov
ZhekaKozlov

Reputation: 39536

RxJava thread-safety

Is this code thread-safe?

Observable<String> observable = ... // some observable that calls
                                    // onNext from a background thread

observable
  .scan(new ArrayList<String>(), (List<String> acc, String next) -> {
    acc.add(next);
    return acc;
  })
  .subscribe( list -> {
    // do somethind with sequence of lists
    ...
  });

I'm curious because ArrayList is not a thread-safe data structure.

Upvotes: 9

Views: 3081

Answers (2)

Ana Betts
Ana Betts

Reputation: 74654

If this code isn't thread-safe, then either RxJava is broken or your Observable source is broken - operators being non-reentrant is part of the Rx contract.

Upvotes: 4

Lee Campbell
Lee Campbell

Reputation: 10783

As a quick answer, in .NET (the original Rx implementation) all values from an observable sequence can be assumed to be sequential. This does not preclude it to be multi-threaded. However if you are producing values in a multi-threaded manner, then you may want enforce the sequential nature by looking for the equivalent function to the .NET Synchronize() Rx operator.

Another option is to check the implementation of Scan in the RxJava source code, to validate that it does enforce the sequential nature you would want/expect to provide you safety in your accumulator function.

Upvotes: 6

Related Questions