jo_wil
jo_wil

Reputation: 369

Akka Streams and Scala Play server

I have a server written in scala play 2.6

I am trying to have the websocket

  1. Recieve a request from a client
  2. Process that request
  3. Broadcast the result to all clients if Right broadcast the error only to the client who sent the request if Left

I have the messages broadcasting to all the clients right now, does anyone know how to only respond back to the sender in the error case?

  val processFlow = Flow[String].map(process).map(_.toString)

  val (sink, source) = { 
    MergeHub.source[String]
      .via(processFlow)
      .toMat(BroadcastHub.sink[String])(Keep.both)
      .run()
  }

  val websocketFlow = Flow.fromSinkAndSource(sink, source)

  def ws = WebSocket.accept[String, String] { request =>  
    websocketFlow
  }

  def process(message: String): Either[String, String] = { 
    if (message == "error") { // replace with any error condition
      Left ("ERROR " ++ message)
    } else {
      Right (message ++ " processed")
    }   
  }

Upvotes: 2

Views: 200

Answers (1)

vdebergue
vdebergue

Reputation: 2404

If you trace the sender in your flow, you can then filter the received message before sending them on the websocket:

case class ProcessResult(senderId: String, result: Either[String, String])

val (sink, source) = { 
  MergeHub.source[ProcessResult]
    .toMat(BroadcastHub.sink[ProcessResult])(Keep.both)
    .run()
}
val websocketFlow = Flow.fromSinkAndSource(sink, source)

def ws = WebSocket.accept[String, String] { request =>
  // create a random id to identify the sender
  val senderId = UUID.randomUUID().toString
  Flow[String]
    .map(process)
    .map(result => ProcessResult(senderId, result))
    // broadcast the result to the other websockets
    .via(websocketFlow)
    // filter the results to only keep the errors for the sender
    .collect {
      case ProcessResult(sender, Left(error)) if sender == senderId => List(error)
      case ProcessResult(_, Left(error)) => List.empty
      case ProcessResult(_, Right(result)) => List(result)
    }.mapConcat(identity)
}

def process(message: String): Either[String, String] = { 
  if (message == "error") { // replace with any error condition
    Left ("ERROR " ++ message)
  } else {
    Right (message ++ " processed")
  }   
}

Upvotes: 1

Related Questions