softshipper
softshipper

Reputation: 34087

How to broadcast the received messages to two different flows

How to broadcast the received messages to two different flows

I am using akka stream websocket client to request and receive the data websocket server. With the received data from the websocket, I would like to broadcast into two different flows. The image below, should clarify the scenario:

enter image description here

As you can see on the image, it should be broadcasted to two different flows subsequently to seperate sink.

The websocket client can be created as the following:

import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._

import scala.concurrent.Future

object WebSocketClientFlow {
  def main(args: Array[String]) = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    import system.dispatcher

    // Future[Done] is the materialized value of Sink.foreach,
    // emitted when the stream completes
    val incoming: Sink[Message, Future[Done]] =
      Sink.foreach[Message] {
        case message: TextMessage.Strict =>
          println(message.text)
      }

    // send this as a message over the WebSocket
    val outgoing = Source.single(TextMessage("hello world!"))

    // flow to use (note: not re-usable!)
    val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))

    // the materialized value is a tuple with
    // upgradeResponse is a Future[WebSocketUpgradeResponse] that
    // completes or fails when the connection succeeds or fails
    // and closed is a Future[Done] with the stream completion from the incoming sink
    val (upgradeResponse, closed) =
      outgoing
        .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
        .toMat(incoming)(Keep.both) // also keep the Future[Done]
        .run()

    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    val connected = upgradeResponse.flatMap { upgrade =>
      if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Future.successful(Done)
      } else {
        throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
      }
    }

    // in a real application you would not side effect here
    connected.onComplete(println)
    closed.foreach(_ => println("closed"))
  }
}

Upvotes: 2

Views: 128

Answers (1)

Shanti Swarup Tunga
Shanti Swarup Tunga

Reputation: 641

You can use SinkShape to get the required flow

Sink.fromGraph(GraphDSL.create(){
  implicit b =>
    val bcast = b.add(Broadcast[Message](2))
    val flow1 = b.add(Flow[Message].map(m => m))
    val flow2 = b.add(Flow[Message].map(m => m ))
    val sink1 = b.add(Sink.foreach(println))
    val sink2 = b.add(Sink.foreach(println))

    bcast ~> flow1 ~> sink1
    bcast ~> flow2 ~> sink2

    SinkShape(bcast.in)
})

The entire code is

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  import system.dispatcher

  // Future[Done] is the materialized value of Sink.foreach,
  // emitted when the stream completes

  val incomingSink = Sink.fromGraph(GraphDSL.create() {
    implicit b =>
      import GraphDSL.Implicits._
      val bcast = b.add(Broadcast[Message](2))
      val flow1 = b.add(Flow[Message].map(m => m))
      val flow2 = b.add(Flow[Message].map(m => m ))
      val sink1 = b.add(Sink.head[Message])
      val sink2 = b.add(Sink.head[Message])

      bcast ~> flow1 ~> sink1
      bcast ~> flow2 ~> sink2

      SinkShape(bcast.in)
  }).mapMaterializedValue(_ => Future(Done))
  // send this as a message over the WebSocket
  val outgoing = Source.single(TextMessage("hello world!"))

  // flow to use (note: not re-usable!)
  val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org"))

  // the materialized value is a tuple with
  // upgradeResponse is a Future[WebSocketUpgradeResponse] that
  // completes or fails when the connection succeeds or fails
  // and closed is a Future[Done] with the stream completion from the incoming sink
  val (upgradeResponse, closed) =
  outgoing
    .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
    .toMat(incomingSink)(Keep.both) // also keep the Future[Done]
    .run()

  // just like a regular http request we can access response status which is available via upgrade.response.status
  // status code 101 (Switching Protocols) indicates that server support WebSockets
  val connected = upgradeResponse.flatMap { upgrade =>
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
      Future.successful(Done)
    } else {
      throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
    }
  }

  // in a real application you would not side effect here
  connected.onComplete(println)
  closed.foreach(_ => println("closed"))

Upvotes: 2

Related Questions