Oleg
Oleg

Reputation: 941

How to handle errors and keep an Observable alive in .mapParallelUnordered

I am using Monix 3 and have a kind of this code:

  Observable.fromIterable(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9))
    .flatMap(i =>
      if (i % 2 == 0) {   // Bad i
        Observable.empty
      } else
        Observable.pure(i)
    )
    .foreachL(i => print(s"Good i: $i"))   /*Output: Good i: 1
                                                     Good i: 3
                                                     Good i: 5
                                                     Good i: 7
                                                     Good i: 9*/

This code works good, but I have a numerous long while IO operations, so decided to refactor with .mapParallelUnordered:

  Observable.fromIterable(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9))
    .mapParallelOrdered(3)(i =>
      if (i % 2 == 0) {
        Task.raiseError(new Exception(s"Bad i: $i"))
      } else
        Task.pure(i)
    )
    .foreachL(i => print(s"Good i: $i"))    /*Output: Good i: 1*/

I am trying to get the same result, as in the first example, but in parallel processing. The problem is Task.raiseError kills a whole observable, so it stops on i = 2.

How to handle errors and keep an Observable alive?

Upvotes: 1

Views: 128

Answers (1)

atl
atl

Reputation: 326

You can try this:

Observable
  .fromIterable(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9))
  .mapParallelOrdered(3)(
    i =>
      if (i % 2 == 0) {
        Task.pure(Left("Error"))
      } else
        Task.pure(Right(i))
  )
  .collect {
    case Right(i) => i
  }
  .foreachL(i => print(s"Good i: $i"))

Upvotes: 3

Related Questions