Reputation: 24528
I'm trying to wrap some blocking calls in Future
.The return type is Seq[User]
where User
is a case class
. The following just wouldn't compile with complaints of various overloaded versions being present. Any suggestions? I tried almost all the variations is Source.apply
without any luck.
// All I want is Seq[User] => Future[Seq[User]]
def findByFirstName(firstName: String) = {
val users: Seq[User] = userRepository.findByFirstName(firstName)
val sink = Sink.fold[User, User](null)((_, elem) => elem)
val src = Source(users) // doesn't compile
src.runWith(sink)
}
Upvotes: 2
Views: 6289
Reputation: 12090
First of all, I assume that you are using version 1.0 of akka-http-experimental
since the API may changed from previous release.
The reason why your code does not compile is that the akka.stream.scaladsl.Source$.apply()
requires
scala.collection.immutable.Seq
instead of scala.collection.mutable.Seq
.
Therefore you have to convert from mutable sequence to immutable sequence using to[T]
method.
Document: akka.stream.scaladsl.Source
Additionally, as you see the document, Source$.apply()
accepts ()=>Iterator[T]
so you can also pass ()=>users.iterator
as argument.
Since Sink.fold(...)
returns the last evaluated expression, you can give an empty Seq()
as the first argument, iterate over the users
with appending the element to the sequence, and finally get the result.
However, there might be a better solution that can create a Sink
which puts each evaluated expression into Seq
, but I could not find it.
The following code works.
import akka.actor._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source,Sink}
import scala.concurrent.ExecutionContext.Implicits.global
case class User(name:String)
object Main extends App{
implicit val system = ActorSystem("MyActorSystem")
implicit val materializer = ActorMaterializer()
val users = Seq(User("alice"),User("bob"),User("charlie"))
val sink = Sink.fold[Seq[User], User](Seq())(
(seq, elem) =>
{println(s"elem => ${elem} \t| seq => ${seq}");seq:+elem})
val src = Source(users.to[scala.collection.immutable.Seq])
// val src = Source(()=>users.iterator) // this also works
val fut = src.runWith(sink) // Future[Seq[User]]
fut.onSuccess({
case x=>{
println(s"result => ${x}")
}
})
}
The output of the code above is
elem => User(alice) | seq => List()
elem => User(bob) | seq => List(User(alice))
elem => User(charlie) | seq => List(User(alice), User(bob))
result => List(User(alice), User(bob), User(charlie))
Upvotes: 6