practice2perfect
practice2perfect

Reputation: 507

Scala & Play Websockets: Storing messages exchanged

I started playing around scala and came to this particular boilerplate of web socket chatroom in scala.

They use MessageHub.source() and BroadcastHub.sink() as their Source and Sink for sending the messages to all connected clients.

The example is working fine for exchanging messages as it is.

private val (chatSink, chatSource) = {
  // Don't log MergeHub$ProducerFailed as error if the client disconnects.
  // recoverWithRetries -1 is essentially "recoverWith"
  val source = MergeHub.source[WSMessage]
    .log("source")
    .recoverWithRetries(-1, { case _: Exception ⇒ Source.empty })

  val sink = BroadcastHub.sink[WSMessage]
  source.toMat(sink)(Keep.both).run()
}

private val userFlow: Flow[WSMessage, WSMessage, _] = {
 Flow.fromSinkAndSource(chatSink, chatSource)
}

def chat(): WebSocket = {
  WebSocket.acceptOrResult[WSMessage, WSMessage] {
    case rh if sameOriginCheck(rh) =>
      Future.successful(userFlow).map { flow =>
        Right(flow)
      }.recover {
        case e: Exception =>
          val msg = "Cannot create websocket"
          logger.error(msg, e)
          val result = InternalServerError(msg)
          Left(result)
      }

    case rejected =>
      logger.error(s"Request ${rejected} failed same origin check")
      Future.successful {
      Left(Forbidden("forbidden"))
      }
  }
}

I want to store the messages that are exchanged in the chatroom in a DB.

I tried adding map and fold functions to source and sink to get hold of the messages that are sent but I wasn't able to.

I tried adding a Flow stage between MergeHub and BroadcastHub like below

val flow = Flow[WSMessage].map(element => println(s"Message: $element"))
source.via(flow).toMat(sink)(Keep.both).run()

But it throws a compilation error that cannot reference toMat with such signature.

Can someone help or point me how can I get hold of messages that are sent and store them in DB.

Link for full template:

https://github.com/playframework/play-scala-chatroom-example

Upvotes: 0

Views: 296

Answers (1)

Frederic A.
Frederic A.

Reputation: 3514

Let's look at your flow:

val flow = Flow[WSMessage].map(element => println(s"Message: $element"))

It takes elements of type WSMessage, and returns nothing (Unit). Here it is again with the correct type:

val flow: Flow[Unit] = Flow[WSMessage].map(element => println(s"Message: $element"))

This will clearly not work as the sink expects WSMessage and not Unit.

Here's how you can fix the above problem:

val flow = Flow[WSMessage].map { element =>
  println(s"Message: $element")
  element
 }

Not that for persisting messages in the database, you will most likely want to use an async stage, roughly:

val flow = Flow[WSMessage].mapAsync(parallelism) { element =>
  println(s"Message: $element")
  // assuming DB.write() returns a Future[Unit]
  DB.write(element).map(_ => element)
 }

Upvotes: 1

Related Questions