Abhijit Sarkar
Abhijit Sarkar

Reputation: 24528

Scala Akka Stream: How to Pass Through a Seq

I'm trying to wrap some blocking calls in Future.The return type is Seq[User] where User is a case class. The following just wouldn't compile with complaints of various overloaded versions being present. Any suggestions? I tried almost all the variations is Source.apply without any luck.

// All I want is Seq[User] => Future[Seq[User]]

def findByFirstName(firstName: String) = {
  val users: Seq[User] = userRepository.findByFirstName(firstName)

  val sink = Sink.fold[User, User](null)((_, elem) => elem)

  val src = Source(users) // doesn't compile

  src.runWith(sink)
}

Upvotes: 2

Views: 6289

Answers (2)

ymonad
ymonad

Reputation: 12090

First of all, I assume that you are using version 1.0 of akka-http-experimental since the API may changed from previous release.

The reason why your code does not compile is that the akka.stream.scaladsl.Source$.apply() requires scala.collection.immutable.Seq instead of scala.collection.mutable.Seq.

Therefore you have to convert from mutable sequence to immutable sequence using to[T] method.

Document: akka.stream.scaladsl.Source

Additionally, as you see the document, Source$.apply() accepts ()=>Iterator[T] so you can also pass ()=>users.iterator as argument.

Since Sink.fold(...) returns the last evaluated expression, you can give an empty Seq() as the first argument, iterate over the users with appending the element to the sequence, and finally get the result.

However, there might be a better solution that can create a Sink which puts each evaluated expression into Seq, but I could not find it.

The following code works.

import akka.actor._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source,Sink}
import scala.concurrent.ExecutionContext.Implicits.global

case class User(name:String)

object Main extends App{
    implicit val system = ActorSystem("MyActorSystem")
    implicit val materializer = ActorMaterializer()
    val users = Seq(User("alice"),User("bob"),User("charlie"))

    val sink = Sink.fold[Seq[User], User](Seq())(
      (seq, elem) => 
        {println(s"elem => ${elem} \t| seq => ${seq}");seq:+elem})

    val src = Source(users.to[scala.collection.immutable.Seq])
    // val src = Source(()=>users.iterator) // this also works

    val fut = src.runWith(sink) // Future[Seq[User]]
    fut.onSuccess({
      case x=>{
        println(s"result => ${x}")
      }
    })
}

The output of the code above is

elem => User(alice)     | seq => List()
elem => User(bob)       | seq => List(User(alice))
elem => User(charlie)   | seq => List(User(alice), User(bob))
result => List(User(alice), User(bob), User(charlie))

Upvotes: 6

al32
al32

Reputation: 109

If you need just Future[Seq[Users]] dont use akka streams but futures

import scala.concurrent._
import ExecutionContext.Implicits.global
val session = socialNetwork.createSessionFor("user", credentials)
val f: Future[List[Friend]] = Future {
  session.getFriends()
}

Upvotes: 3

Related Questions