jesus g_force Harris
jesus g_force Harris

Reputation: 534

Stream Future in Play 2.5

Once again I am attempting to update some pre Play 2.5 code (based on this vid). For example the following used to be how to stream a Future:

  Ok.chunked(Enumerator.generateM(Promise.timeout(Some("hello"), 500)))

I have created the following method for the work-around for Promise.timeout (deprecated) using Akka:

  private def keepResponding(data: String, delay: FiniteDuration, interval: FiniteDuration): Future[Result] = {
    val promise: Promise[Result] = Promise[Result]()
    actorSystem.scheduler.schedule(delay, interval) { promise.success(Ok(data)) }
    promise.future
  }

According to the Play Framework Migration Guide; Enumerators should be rewritten to a Source and Source.unfoldAsync is apparently the equivalent of Enumerator.generateM so I was hoping that this would work (where str is a Future[String]):

  def inf = Action { request =>
    val str = keepResponding("stream me", 1.second, 2.second)

    Ok.chunked(Source.unfoldAsync(str))
  }

Of course I'm getting a Type mismatch error and when looking at the case class signature of unfoldAsync:

final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]])

I can see that the parameters are not correct but I'm not fully understanding what/how I should pass this through.

Upvotes: 2

Views: 291

Answers (1)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

unfoldAsync is even more generic than Play!'s own generateM, as it allows you to pass through a status (S) value. This can make the value emitted depend on the previously emitted value(s).

The example below will load values by an increasing id, until the loading fails:

val source: Source[String, NotUsed] = Source.unfoldAsync(0){ id ⇒
  loadFromId(id)
    .map(s ⇒ Some((id + 1, s)))
    .recover{case _ ⇒ None}
}

def loadFromId(id: Int): Future[String] = ???

In your case an internal state is not really needed, therefore you can just pass dummy values whenever required, e.g.

val source: Source[Result, NotUsed] = Source.unfoldAsync(NotUsed) { _ ⇒
  schedule("stream me", 2.seconds).map(x ⇒ Some(NotUsed → x))
}

def schedule(data: String, delay: FiniteDuration): Future[Result] = {
  akka.pattern.after(delay, system.scheduler){Future.successful(Ok(data))}
}

Note that your original implementation of keepResponding is incorrect, as you cannot complete a Promise more than once. Akka after pattern offer a simpler way to achieve what you need.

However, note that in your specific case, Akka Streams offers a more idiomatic solution with Source.tick:

val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, NotUsed).mapAsync(1){ _ ⇒
  loadSomeFuture()
}

def loadSomeFuture(): Future[String] = ???

or even simpler in case you don't actually need asynchronous computation as in your example

val source: Source[String, Cancellable] = Source.tick(1.second, 2.seconds, "stream me")

Upvotes: 1

Related Questions