Reputation: 792
I'm quite new in Akka Stream and I'd like to learn how handle a TCP socket for a project of mine. I took this piece of code from the Akka Stream official documentation.
import akka.stream.scaladsl.Framing
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
If I connect from the terminal using netcat I can see that the Akka Stream TCP socket works as expected. I also found out that If I need to close the connection using an user message, I can use a takeWhile
as follow
import akka.stream.scaladsl.Framing
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.takeWhile(_.toLowerCase.trim != "exit") // < - - - - - - HERE
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
What I can't find is how to manage a socket closed by a CMD + C
action. Akka Stream use Akka.io to manage the TCP connection internally, so it must send some of its PeerClose
messages when the socket is closed. So, my understanding of Akka.io tells me that I should receive a feedback from the socket closing but I can't find how to do that with Akka Stream. Is there a way to manage that ?
Upvotes: 1
Views: 387
Reputation: 20591
connection.handleWith(echo)
is syntactic sugar for connection.flow.joinMat(echo)(Keep.right).run()
which will have the materialized value of echo
, which is generally not useful. Flow.via.map.takeWhile
has NotUsed
as a materialized value, so that's also basically useless. However, you can attach stages to echo
which will materialize differently.
One of these is .watchTermination
:
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo: Flow[ByteString, ByteString, Future[Done]] = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.takeWhile(_.toLowerCase.trim != "exit") // < - - - - - - HERE
.map(_ + "!!!\n")
.map(ByteString(_))
// change the materialized value to a Future[Done]
.watchTermination()(Keep.right)
// you may need to have an implicit ExecutionContext in scope, e.g. system.dispatcher,
// if you don't already
connection.handleWith(echo).onComplete {
case Success(_) => println("stream completed successfully")
case Failure(e) => println(e.getMessage)
}
}
This will not distinguish between your side or the remote side closing the connection normally; it will distinguish the stream failing.
Upvotes: 3