lucataglia
lucataglia

Reputation: 792

Akka Stream, Tcp().bind, handle when the client close the socket

I'm quite new in Akka Stream and I'd like to learn how handle a TCP socket for a project of mine. I took this piece of code from the Akka Stream official documentation.

import akka.stream.scaladsl.Framing

val connections: Source[IncomingConnection, Future[ServerBinding]] =
  Tcp().bind(host, port)

connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo = Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .map(_ + "!!!\n")
    .map(ByteString(_))

  connection.handleWith(echo)
}

If I connect from the terminal using netcat I can see that the Akka Stream TCP socket works as expected. I also found out that If I need to close the connection using an user message, I can use a takeWhile as follow

import akka.stream.scaladsl.Framing

val connections: Source[IncomingConnection, Future[ServerBinding]] =
  Tcp().bind(host, port)

connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo = Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .takeWhile(_.toLowerCase.trim != "exit")   // < - - - - - - HERE
    .map(_ + "!!!\n")
    .map(ByteString(_))

  connection.handleWith(echo)
}

What I can't find is how to manage a socket closed by a CMD + C action. Akka Stream use Akka.io to manage the TCP connection internally, so it must send some of its PeerClose messages when the socket is closed. So, my understanding of Akka.io tells me that I should receive a feedback from the socket closing but I can't find how to do that with Akka Stream. Is there a way to manage that ?

Upvotes: 1

Views: 387

Answers (1)

Levi Ramsey
Levi Ramsey

Reputation: 20591

connection.handleWith(echo) is syntactic sugar for connection.flow.joinMat(echo)(Keep.right).run() which will have the materialized value of echo, which is generally not useful. Flow.via.map.takeWhile has NotUsed as a materialized value, so that's also basically useless. However, you can attach stages to echo which will materialize differently.

One of these is .watchTermination:

connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo: Flow[ByteString, ByteString, Future[Done]] = Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .takeWhile(_.toLowerCase.trim != "exit")   // < - - - - - - HERE
    .map(_ + "!!!\n")
    .map(ByteString(_))
    // change the materialized value to a Future[Done]
    .watchTermination()(Keep.right)

  // you may need to have an implicit ExecutionContext in scope, e.g. system.dispatcher,
  //  if you don't already
  connection.handleWith(echo).onComplete {
    case Success(_) => println("stream completed successfully")
    case Failure(e) => println(e.getMessage)
  }
}

This will not distinguish between your side or the remote side closing the connection normally; it will distinguish the stream failing.

Upvotes: 3

Related Questions