Matthew Adams
Matthew Adams

Reputation: 2197

How would you change this Akka Streams example to get the materialized value Future[ServerBinding]?

The second example at http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-io.html is the following. I added definitions of host & port.

  val host = "localhost"
  val port = 4444
  val connections: Source[IncomingConnection, Future[ServerBinding]] =
    Tcp().bind(host, port)

  connections runForeach { (connection: IncomingConnection) =>
    println(s"New connection from: ${connection.remoteAddress}")

    val echo = Flow[ByteString]
      .via(Framing.delimiter(
        ByteString("\n"),
        maximumFrameLength = 256,
        allowTruncation = true))
      .map(_.utf8String)
      .map(_ + "!!!\n")
      .map(ByteString(_))

    connection.handleWith(echo)
  }

How would you change the example so that you could also get a hold of the Future[ServerBinding] so that you could log successful binding and handle binding errors if, for example, the port were already in use?

Upvotes: 2

Views: 216

Answers (1)

cmbaxter
cmbaxter

Reputation: 35443

You should be able to get this by moving away from the convenience of runForeach which hides the details of the Sink involved and instead use an explicit Sink. Something like this:

  val host = "localhost"
  val port = 4444
  val connections: Source[IncomingConnection, Future[ServerBinding]] =
    Tcp().bind(host, port)


  val sink = Sink.foreach[IncomingConnection]{ connection =>
    println(s"New connection from: ${connection.remoteAddress}")

    val echo = Flow[ByteString]
      .via(Framing.delimiter(
        ByteString("\n"),
        maximumFrameLength = 256,
        allowTruncation = true))
      .map(_.utf8String)
      .map(_ + "!!!\n")
      .map(ByteString(_))

    connection.handleWith(echo)    
  }

  val serverBinding:Future[ServerBinding] = connections.to(sink).run

Upvotes: 3

Related Questions