Reputation: 11275
I am working on a simple example for the task I am trying to complete.
Let's say there is a list of tasks (tasks
) that I want to trigger every 1 second.
That could be done with a scheduler
or whatever.
Now there are two consumers of this stream, but
C1
should trigger on completion of all tasks C2
should trigger on every n-th completion of all tasks. (Can be also every n seconds)Here is some sample code. Currently, it is not scheduled to repeat - because I do not know if it is better to use Observable.repeat
or a Scheduler
.
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.execution.Scheduler.{global => scheduler}
import monix.reactive.{Consumer, Observable}
import scala.concurrent.duration._
object MainTest {
def main(args: Array[String]): Unit = {
def t = (i: Int) => Observable.eval {
print(i)
i
}
val tsks = (1 to 5).map(t)
val tasks = Observable.fromIterable(tsks).flatten.doOnCompleteEval(Task.eval(println("")))
val c1 = Consumer.foreach[Int](x => println(s"C1: [$x]"))
val c2 = Consumer.foreach[Int](x => println(s"C2: [$x]"))
val s = tasks.reduce(_ + _).publish
s.consumeWith(c1).runAsync
s.consumeWith(c2).runAsync
s.connect()
while (true) {
Thread.sleep(1.hour.toMillis)
}
}
}
Upvotes: 0
Views: 198
Reputation: 8069
First of all, for repeating a task every 1 second, you can do ...
Observable.intervalAtFixedRate(1.second)
.flatMap(_ => Observable.eval(???))
For triggering on completion of all tasks, you can use either completed
(if you want an Observable[Nothing]
which emits only the final completion event) or completedL
(if you want to work with a Task[Unit]
instead).
See the API docs for details.
So instead of your c1
thing, you can do:
s.completeL.runAsync
For sampling the source however, you can work with:
sample
(alias throttleLast
)sampleRepeated
throttleFirst
debounce
debounceRepeated
echo
echoRepeated
I encourage you to play with these, start with the API docs.
s.sample(10.seconds).doOnNext(println).completedL.runAsync
Or you can simply take every Nth elements with takeEveryNth
:
s.takeEveryNth(20).doOnNext(println).completedL.runAsync
Let me know if this answers your question.
Upvotes: 2