GrowinMan
GrowinMan

Reputation: 4907

Does RxJava's Observable.subscriber()'s onNext callback have to use threadsafe objects?

This is what my code looks like:

ArrayList<T> result = new ArrayList<T>();
observable.subscribe(result::add, <something here>, <something here>);

Is this not recommended? Should result be a thread-safe structure?

Upvotes: 0

Views: 408

Answers (2)

Bob Dalgleish
Bob Dalgleish

Reputation: 8227

It is important that you understand what thread safety means.

One type of thread safety arises from compartmentalization, from only performing operations on an object in one thread. In that case, the object you working on does not have special thread safety issues.

The other type of thread safety comes about when you use synchronize, volatile, etc, to ensure the serial nature of access and modification.

In your example, result is not thread-safe, but we can't tell if it is properly compartmentalised, making thread-safety a non-issue. Examples of safety violations:

  • your observable could have values emitted through it from different threads. You need to make sure your are using a SerializedSubject or similar to ensure that the observable contract is maintaned.
  • any code that uses result while the observer chain is active runs the risk of making the use unsafe.

Here is how to compartmentalize the access to result:

Scheduler myScheduler; // have a known, single-threaded scheduler.
...
List<String> result;
...
observer
  .observeOn( myScheduler )
  .subscribe( List::add, err -> {},
     () -> { // do end of processing here
   });

Using observeOn() will ensure a single thread is responsible for adding elements to result.

To access the contents of result, you need to a) wait until the observer chain is done, which can be signalled in the onCompleted() processing, or b) you need to run on the same thread as the observer chain.

Upvotes: 1

Swatarianess
Swatarianess

Reputation: 31

I have used it without problem with non thread safe objects and have had little to no issues. There are additional methods for RxJava's Observables i recommend looking into, when it comes to maintaining good concurrency practises.

Upvotes: 0

Related Questions