lichgo
lichgo

Reputation: 533

Scala akka-http WebSocket: How to save the client connection and push message to the client when needed?

How to keep the client (web) connection in a memory variable and then send outgoing messages to the client (web) when needed?

I already have some simple code for pushing back message to the client once the server receives messages from the client. How to modify the code below for the outgoing messaging part?

implicit val actorSystem = ActorSystem("akka-system")
implicit val flowMaterializer = ActorMaterializer()
implicit val executionContext = actorSystem.dispatcher

val ip = "127.0.0.1"
val port = 32000

val route = get {
    pathEndOrSingleSlash {
        complete("Welcome to websocket server")
    }
} ~
path("hello") {
    get {
        handleWebSocketMessages(echoService)
    }
}

def sendMessageToClient(msg : String) {

    // *** How to implement this?
    // *** How to save the client connection when it is first connected?
    //     Then how to send message to this connection?

}

val echoService = Flow[Message].collect {

    // *** Here the server push back messages when receiving msg from client

    case tm : TextMessage => TextMessage(Source.single("Hello ") ++ tm.textStream)
    case _ => TextMessage("Message type unsupported")
}

val binding = Http().bindAndHandle(route, ip, port)

Upvotes: 6

Views: 761

Answers (1)

In-Ho Yi
In-Ho Yi

Reputation: 978

You can look into pipelining the sink flow via .map call. Inside the .map call you can capture the value and then return the same message. For example:

  Flow[Message].collect {
    case tm : TextMessage =>
      TextMessage(Source.single("Hello ") ++ tm.textStream.via(
        Flow[String].map((message) => {println(message) /* capture value here*/; message})))
    case _ => TextMessage("Message type unsupported")
  }

Now, if your intention is to process those values and send out values later, what you want is not a single source-to-sink flow, but two separate streams for sink and source, for which you can use Flow.fromSinkAndSource e.g.

Flow.fromSinkAndSource[Message, Message](
  Flow[Message].collect { /* capture values */},
    // Or send stream to other sink for more processing
  source
)

In all likelihood, this source will be either constructed out of graph DSL, a hand-rolled actor, or you can look into utilizing reusable helpers such as MergeHub.

Upvotes: 1

Related Questions