Atais
Atais

Reputation: 11275

Hot observable consume with different frequency

I am working on a simple example for the task I am trying to complete. Let's say there is a list of tasks (tasks) that I want to trigger every 1 second.

That could be done with a scheduler or whatever.

Now there are two consumers of this stream, but

Here is some sample code. Currently, it is not scheduled to repeat - because I do not know if it is better to use Observable.repeat or a Scheduler.

import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.execution.Scheduler.{global => scheduler}
import monix.reactive.{Consumer, Observable}

import scala.concurrent.duration._

object MainTest {

  def main(args: Array[String]): Unit = {

    def t = (i: Int) => Observable.eval {
      print(i)
      i
    }

    val tsks = (1 to 5).map(t)

    val tasks = Observable.fromIterable(tsks).flatten.doOnCompleteEval(Task.eval(println("")))

    val c1 = Consumer.foreach[Int](x => println(s"C1: [$x]"))
    val c2 = Consumer.foreach[Int](x => println(s"C2: [$x]"))

    val s = tasks.reduce(_ + _).publish

    s.consumeWith(c1).runAsync
    s.consumeWith(c2).runAsync
    s.connect()

    while (true) {
      Thread.sleep(1.hour.toMillis)
    }
  }

}

Upvotes: 0

Views: 198

Answers (1)

Alexandru Nedelcu
Alexandru Nedelcu

Reputation: 8069

First of all, for repeating a task every 1 second, you can do ...

Observable.intervalAtFixedRate(1.second)
  .flatMap(_ => Observable.eval(???))

For triggering on completion of all tasks, you can use either completed (if you want an Observable[Nothing] which emits only the final completion event) or completedL (if you want to work with a Task[Unit] instead). See the API docs for details.

So instead of your c1 thing, you can do:

s.completeL.runAsync

For sampling the source however, you can work with:

  • sample (alias throttleLast)
  • sampleRepeated
  • throttleFirst
  • debounce
  • debounceRepeated
  • echo
  • echoRepeated

I encourage you to play with these, start with the API docs.

s.sample(10.seconds).doOnNext(println).completedL.runAsync

Or you can simply take every Nth elements with takeEveryNth:

s.takeEveryNth(20).doOnNext(println).completedL.runAsync

Let me know if this answers your question.

Upvotes: 2

Related Questions