anirudhas
anirudhas

Reputation: 71

Heap Memory is growing while using akka-http for implementing web socker server

I am using akka-http for implementing web socket server.

Following is the request handler code for it:

  def decodeService: Flow[Message, Message, _] = { 
    Flow[Message].map { 
      case BinaryMessage.Strict(encodeMsg) => 
        try {
          WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg))
        } catch {
          case e: Exception => {
            println("[ERROR] failed to send BinaryMessage.Strict: " + e)
            TextMessage("[ERROR] failed receiving BinaryMessage.Strict")
          }
        }
        TextMessage("[INFO] BinaryMessage.Strict")

      case BinaryMessage.Streamed(streamedMsg) => {
        implicit val system = ActorSystem("DecoderSystem")
        implicit val materializer = ActorMaterializer()
        val streamedMsgFuture: Future[Seq[ByteString]] = streamedMsg.runWith(Sink.seq)
        streamedMsgFuture.onComplete { completedStream =>
          var completeBytestring = new ByteStringBuilder()
          //I'm sure there's a better way to do this.. but hey, it's O(n)
          completedStream.foreach { x => 
            x.foreach { y => 
              completeBytestring ++= y  
            }
          }
          try {
              WebServer.getWorkerActor ! QueueWork(protoMsg(this, completeBytestring.result()))
          } catch {
            case e: Exception => {
              println("[ERROR] failed to send BinaryMessage.Streamed: " + e)
              TextMessage("[ERROR] failed receiving BinaryMessage.Streamed")
            }
          } finally {
            completeBytestring.clear()
          }
        }
        TextMessage("[INFO] BinaryMessage.Streamed")
      }

      case TextMessage.Strict(txt) => TextMessage("Succesfully receive text message")
      case _ => TextMessage("Message type unsupported")
    }
  }

My web server gets stream of data frequently every 1min. I have seen memory is growing. If I don't process Streamed messages, its able to hold. Also connection between client and server is persistent.

Have I used Flows/Sink/Source wrongly? How do flush stream?

Thanks

Upvotes: 1

Views: 513

Answers (1)

Vladimir Matveev
Vladimir Matveev

Reputation: 128131

Well, the most glaring problem is that you create an entire new ActorSystem for each streaming message you receive. An ActorSystem is like a thread pool for your actors; you want to create as few as possible of them, ideally only one for the entire application. And not only you create them for each message, you do not close them - all of the dispatchers configured in an ActorSystem, and all of the resources it holds will remain forever hanging. Of course, your memory usage will grow, if you receive lots of streamed messages.

Since you use akka-http, you are bound to have one ActorSystem where you invoke Http().bind*. You need to make it accessible inside the decodeService method. Also, the way you compute the combined byte stream seems overly complicated to me. Consider writing it like this:

def decodeService: Flow[Message, Message, _] = Flow[Message].mapAsync(4) {
  case m: BinaryMessage.Strict =>
    Future.successful(m)
  case BinaryMessage.Streamed(streamMsg) =>
    streamMsg.runReduce(_ ++ _).map(BinaryMessage.Strict)
  case m =>
    Future.successful(m)
}.map {
  case BinaryMessage.Strict(encodeMsg) =>
    try {
      WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg))
      TextMessage("[INFO] BinaryMessage.Strict")
    } catch {
      case NonFatal(e) =>
        println("[ERROR] failed to send BinaryMessage.Strict: " + e)
        TextMessage("[ERROR] failed receiving BinaryMessage.Strict")
    }
  case TextMessage.Strict(txt) => TextMessage("Succesfully receive text message")
  case _ => TextMessage("Message type unsupported")
}

Here, first we transform all binary messages to BinaryMessage.Strict, and then we are processing them like in your original code. Note that you have to write the confirmation message inside the try block, because otherwise even if you have an exception, the successful message will be returned. Also, if you decide not to handle text messages at all, the code could become even simpler:

def decodeService: Flow[Message, Message, _] = Flow[Message]
  .filterNot(_.isText)
  .mapAsync(4) {
    case BinaryMessage.Strict(binary) =>
      Future.successful(binary)
    case BinaryMessage.Stream(binaryStream) =>
      binaryStream.runReduce(_ ++ _)
  .map { encodeMsg =>
    try {
      WebServer.getWorkerActor ! QueueWork(protoMsg(this, encodeMsg))
      TextMessage("[INFO] BinaryMessage.Strict")
    } catch {
      case NonFatal(e) =>
        println("[ERROR] failed to send BinaryMessage.Strict: " + e)
        TextMessage("[ERROR] failed receiving BinaryMessage.Strict")
    }
  }

Upvotes: 3

Related Questions