invariant
invariant

Reputation: 8900

How to keep connection open for all the time in websockets

Server code :

object EchoService {

  def route: Route = path("ws-echo") {
    get {
      handleWebSocketMessages(flow)
    }
  } ~ path("send-client") {
    get {
      sourceQueue.map(q => {
        println(s"Offering message from server")
        q.offer(BinaryMessage(ByteString("ta ta")))
      } )
      complete("Sent from server successfully")
    }
  }

  val (source, sourceQueue) = {
    val p = Promise[SourceQueue[Message]]
    val s = Source.queue[Message](100, OverflowStrategy.backpressure).mapMaterializedValue(m => {
      p.trySuccess(m)
      m
    })
    (s, p.future)
  }

  val flow =
    Flow.fromSinkAndSourceMat(Sink.ignore, source)(Keep.right)
}

Client Code :

object Client extends App {


  implicit val actorSystem = ActorSystem("akka-system")
  implicit val flowMaterializer = ActorMaterializer()

  val config = actorSystem.settings.config
  val interface = config.getString("app.interface")

  val port = config.getInt("app.port")


  // print each incoming strict text message
  val printSink: Sink[Message, Future[Done]] =
    Sink.foreach {
      case message: TextMessage.Strict =>
        println(message.text)

      case _ => println(s"received unknown message format")
    }

  val (source, sourceQueue) = {
    val p = Promise[SourceQueue[Message]]
    val s = Source.queue[Message](100, OverflowStrategy.backpressure).mapMaterializedValue(m => {
      p.trySuccess(m)
      m
    })
    (s, p.future)
  }

  val flow =
    Flow.fromSinkAndSourceMat(printSink, source)(Keep.right)


  val (upgradeResponse, sourceClosed) =
    Http().singleWebSocketRequest(WebSocketRequest("ws://localhost:8080/ws-echo"), flow)

  val connected = upgradeResponse.map { upgrade =>
    // just like a regular http request we can get 404 NotFound,
    // with a response body, that will be available from upgrade.response
    if (upgrade.response.status == StatusCodes.SwitchingProtocols || upgrade.response.status == StatusCodes.OK ) {
      Done
    } else {
      throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
    }
  }

  connected.onComplete(println)

}

when i hit http://localhost:8080/send-client i see messages coming to client but after a while if try to send to client again i don't see any messages on client side :s . I also tried source.concatMat(Source.maybe)(Keep.right) but no luck :(

Edit : I tested with js client, somehow connection/flow closed on server end , is there anyway to prevent this ? and how can i listen to this event while using akka-http websocket client :s

Upvotes: 5

Views: 3264

Answers (1)

invariant
invariant

Reputation: 8900

Hi,

The reason why it does not keep connected is because by default all HTTP connections have idle-timeout on by default to keep the system from leaking connections if clients disappear without any signal.

One way to overcome this limitation (and actually my recommended approach) is to inject keep-alive messages on the client side (messages that the server otherwise ignore, but informs the underlying HTTP server that the connection is still live).

You can override the idle-timeouts in the HTTP server configuration to a larger value but I don't recommend that.

If you are using stream based clients, injecting heartbeats when necessary is as simple as calling keepAlive and providing it a time interval and a factory for the message you want to inject: http://doc.akka.io/api/akka/2.4.7/index.html#akka.stream.scaladsl.Flow@keepAliveU>:Out:FlowOps.this.Repr[U]

That combinator will make sure that no periods more than T will be silent as it will inject elements to keep this contract if necessary (and will not inject anything if there is enough background traffic)

-Endre

thank you Endre :) , working snippet ..

// on client side 

 val (source, sourceQueue) = {
    val p = Promise[SourceQueue[Message]]
    val s = Source.queue[Message](Int.MaxValue, OverflowStrategy.backpressure).mapMaterializedValue(m => {
      p.trySuccess(m)
      m
    }).keepAlive(FiniteDuration(1, TimeUnit.SECONDS), () => TextMessage.Strict("Heart Beat"))
    (s, p.future)
  }

Upvotes: 6

Related Questions