Reputation: 71
I am using akka-http for implementing web socket server.
Following is the request handler code for it:
def decodeService: Flow[Message, Message, _] = {
Flow[Message].map {
case BinaryMessage.Strict(encodeMsg) =>
try {
WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg))
} catch {
case e: Exception => {
println("[ERROR] failed to send BinaryMessage.Strict: " + e)
TextMessage("[ERROR] failed receiving BinaryMessage.Strict")
}
}
TextMessage("[INFO] BinaryMessage.Strict")
case BinaryMessage.Streamed(streamedMsg) => {
implicit val system = ActorSystem("DecoderSystem")
implicit val materializer = ActorMaterializer()
val streamedMsgFuture: Future[Seq[ByteString]] = streamedMsg.runWith(Sink.seq)
streamedMsgFuture.onComplete { completedStream =>
var completeBytestring = new ByteStringBuilder()
//I'm sure there's a better way to do this.. but hey, it's O(n)
completedStream.foreach { x =>
x.foreach { y =>
completeBytestring ++= y
}
}
try {
WebServer.getWorkerActor ! QueueWork(protoMsg(this, completeBytestring.result()))
} catch {
case e: Exception => {
println("[ERROR] failed to send BinaryMessage.Streamed: " + e)
TextMessage("[ERROR] failed receiving BinaryMessage.Streamed")
}
} finally {
completeBytestring.clear()
}
}
TextMessage("[INFO] BinaryMessage.Streamed")
}
case TextMessage.Strict(txt) => TextMessage("Succesfully receive text message")
case _ => TextMessage("Message type unsupported")
}
}
My web server gets stream of data frequently every 1min. I have seen memory is growing. If I don't process Streamed messages, its able to hold. Also connection between client and server is persistent.
Have I used Flows/Sink/Source wrongly? How do flush stream?
Thanks
Upvotes: 1
Views: 513
Reputation: 128131
Well, the most glaring problem is that you create an entire new ActorSystem
for each streaming message you receive. An ActorSystem
is like a thread pool for your actors; you want to create as few as possible of them, ideally only one for the entire application. And not only you create them for each message, you do not close them - all of the dispatchers configured in an ActorSystem
, and all of the resources it holds will remain forever hanging. Of course, your memory usage will grow, if you receive lots of streamed messages.
Since you use akka-http, you are bound to have one ActorSystem
where you invoke Http().bind*
. You need to make it accessible inside the decodeService
method. Also, the way you compute the combined byte stream seems overly complicated to me. Consider writing it like this:
def decodeService: Flow[Message, Message, _] = Flow[Message].mapAsync(4) {
case m: BinaryMessage.Strict =>
Future.successful(m)
case BinaryMessage.Streamed(streamMsg) =>
streamMsg.runReduce(_ ++ _).map(BinaryMessage.Strict)
case m =>
Future.successful(m)
}.map {
case BinaryMessage.Strict(encodeMsg) =>
try {
WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg))
TextMessage("[INFO] BinaryMessage.Strict")
} catch {
case NonFatal(e) =>
println("[ERROR] failed to send BinaryMessage.Strict: " + e)
TextMessage("[ERROR] failed receiving BinaryMessage.Strict")
}
case TextMessage.Strict(txt) => TextMessage("Succesfully receive text message")
case _ => TextMessage("Message type unsupported")
}
Here, first we transform all binary messages to BinaryMessage.Strict
, and then we are processing them like in your original code. Note that you have to write the confirmation message inside the try
block, because otherwise even if you have an exception, the successful message will be returned. Also, if you decide not to handle text messages at all, the code could become even simpler:
def decodeService: Flow[Message, Message, _] = Flow[Message]
.filterNot(_.isText)
.mapAsync(4) {
case BinaryMessage.Strict(binary) =>
Future.successful(binary)
case BinaryMessage.Stream(binaryStream) =>
binaryStream.runReduce(_ ++ _)
.map { encodeMsg =>
try {
WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg))
TextMessage("[INFO] BinaryMessage.Strict")
} catch {
case NonFatal(e) =>
println("[ERROR] failed to send BinaryMessage.Strict: " + e)
TextMessage("[ERROR] failed receiving BinaryMessage.Strict")
}
}
Upvotes: 3