Reputation: 5763
I have a function get: T => scala.concurrent.Future[T]
I want to iterates it like :
val futs: Iterator[Future[T]] = Iterator.iterate(get(init)){
_.flatMap(prev => get(prev))
}
But the type of Iterator is Future[T]
, it is not easy to process this iterator.
How could I transfer that to
Process[?, T]
(Maybe T => Future[T]
as context type F
).
Upvotes: 4
Views: 427
Reputation: 5763
I made another solution
def iterate[F[_],A](init: A)(f: A => F[A]): Process[F, A] = {
Process.emit(init) ++ Process.await(f(init)) { next => iterate(next)(f)}
}
This is already an feature of scalaz-stream 0.6, see this pr for detail
Inorder to use scala.concurrent.Future
as context type F
We need import scalaz.std.scalaFuture._
and an Catchable
instance
implicit def futureCatchable(implicit ctx: ExecCtx): Catchable[Future] = {
new Catchable[Future] {
def attempt[A](f: Future[A]) = f.map(\/-(_)).recover { case e => -\/(e)}
def fail[A](err: Throwable) = Future.failed(err)
}
}
Finally I got this:
package stream
import scala.concurrent._
import scalaz._
import scalaz.stream._
package object future {
type ExecCtx = ExecutionContext
def iterate[F[_],A](init: A)(f: A => F[A]): Process[F, A] = {
Process.emit(init) ++ Process.await(f(init)) { next => iterate(next)(f)}
}
implicit def futureCatchable(implicit ctx: ExecCtx): Catchable[Future] = {
new Catchable[Future] {
def attempt[A](f: Future[A]) = f.map(\/-(_)).recover { case e => -\/(e)}
def fail[A](err: Throwable) = Future.failed(err)
}
}
}
object futureApp extends App {
import scalaz.Scalaz._
import future._
import scala.concurrent.ExecutionContext.Implicits.global
def get(i: Int) = Future {
println(i + 1)
i + 1
}
iterate(0)(get).takeWhile(_ < 100000).run
}
Upvotes: 1
Reputation: 9744
Finally I got what Pavel Chlupacek wanted to say. Signal looks cool, but a little bit cryptic for beginner.
import scala.concurrent.{Future => SFuture}
import scala.language.implicitConversions
import scalaz.concurrent.Task
import scalaz.stream._
import scala.concurrent.ExecutionContext.Implicits.global
implicit class Transformer[+T](fut: => SFuture[T]) {
def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = {
import scala.util.{Failure, Success}
import scalaz.syntax.either._
Task.async {
register =>
fut.onComplete {
case Success(v) => register(v.right)
case Failure(ex) => register(ex.left)
}
}
}
}
val init: Int = 0
def f(i: Int): SFuture[Int] = SFuture(i + 1)
val signal = scalaz.stream.async.signal[Int]
// Observe value and push them to signal
val signalSink: Process[Task, Int => Task[Unit]] = // =:= Sink[Task, Int]
Process.constant((input: Int) => signal.set(input))
// Start from init and then consume from signal
val result = (Process.eval(f(init).toTask) ++ signal.discrete.evalMap(i => f(i).toTask)) observe signalSink
println(result.take(10).runLog.run)
Upvotes: 2
Reputation: 864
Assuming you know how to convert Future -> Task (either via implicit or via Process.transform) this shall work:
def get(t:T): Task[T] = ???
val initial : T = ???
val signal = scalaz.stream.async.signal[T]
// emit initial value, and follow by any change of `T` within the signal
val source:Process[Task,T] = eval_(signal.set(t)) fby signal.discrete
// sink to update `T` within the signal
val signalSink:Sink[Task,T] = constant((t:T) => signal.set(t))
// result, that esentially converts T => Task[T] into Process[Task,T]
val result: Process[Task,T] = source.observe(signalSink)
Upvotes: 2
Reputation: 9744
Not super nice solution, but works
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future => SFuture}
import scala.language.implicitConversions
import scalaz.concurrent.Task
import scalaz.stream._
implicit class Transformer[+T](fut: => SFuture[T]) {
def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = {
import scala.util.{Success, Failure}
import scalaz.syntax.either._
Task.async {
register =>
fut.onComplete {
case Success(v) => register(v.right)
case Failure(ex) => register(ex.left)
}
}
}
}
val init: Int = 0
def f(i: Int): SFuture[Int] = SFuture(i + 1)
val p = Process.repeatEval[Task, Int] {
var prev = init
f(prev).toTask.map(next => {prev = next; next})
}
println(p.take(10).runLog.run)
Upvotes: 2