src091
src091

Reputation: 2847

Akka Streams WebSocket to send info on arbitrary events

I want to implement a service where a number of clients can connect to a server using a WebSocket. The server should be able to broadcast messages to all the connected clients on arbitrary internal events. So far I have this code:

import akka.http.scaladsl.server.RouteResult.route2HandlerFlow
import akka.http.scaladsl.server.Directives._
implicit val system = ActorSystem("Server")
implicit val mat = ActorMaterializer()

// The source to broadcast (just ints for simplicity)
val dataSource = Source(1 to 1000).throttle(1, 1.second, 1, ThrottleMode.Shaping).map(_.toString)

// Go via BroadcastHub to allow multiple clients to connect
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
  dataSource.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)

val producer: Source[String, NotUsed] = runnableGraph.run()

// Optional - add sink to avoid backpressuring the original flow when no clients are attached
producer.runWith(Sink.ignore)

val wsHandler: Flow[Message, Message, NotUsed] =
  Flow[Message]
    .mapConcat(_ => Nil) // Ignore any data sent from the client
    .merge(producer)  // Stream the data we want to the client
    .map(l => TextMessage(l.toString))

val route =
  path("ws") {
    handleWebSocketMessages(wsHandler)
  }

val port = 8080

println("Starting up route")
Http().bindAndHandle(route2HandlerFlow(route), "127.0.0.1", port)
println(s"Started HTTP server on port $port")

It successfully broadcasts current ticks to the connected clients. How should I modify this code to be able to also broadcast arbitrary messages, not just scheduled ticks?

Clarification:

By "arbitrary messages" I don't mean other sources like a file or a database, but rather an ability to send a message to a specialized Source and get it relayed to the currently connected clients. Such a message may be a result of some internal system event which can happen at any time.

Upvotes: 1

Views: 563

Answers (2)

Gabriel Francisco
Gabriel Francisco

Reputation: 350

All you have to do is change the dataSource.

Fetching data from a csv file:

val dataSource = FileIO.fromPath(Paths.get("file.csv"))
  .via(Framing.delimiter(ByteString("\n"), 256, true)
  .map(_.utf8String))

Fetching data from SQS (Alpakka):

val dataSource = SqsSource(queue, sqsSourceSettings).take(100).map(_.getBody)

Fetching data from a table using Slick (Alpakka):

val dataSource = Slick.source(sql"SELECT NAME FROM USERS".as[String])

Basically you need to understand three things:

  • Source: one output
  • Flow: one input, one output
  • Sink: one input.

Knowing this, you can build linear pipelines, just like:

source.via(flow1).via(flow2).runWith(sink)

So, you can easily "plug" sources into an existing pipeline and run them with any sink you want:

val pipeline = flow1.via(flow2)

val fileSource = FileIO.fromPath(Paths.get("file.csv"))
  .via(Framing.delimiter(ByteString("\n"), 256, true)
  .map(_.utf8String))
  .via(pipeline)
  .runWith(sink)

val sqsSource = Slick
  .source(sql"SELECT NAME FROM USERS".as[String])
  .via(pipeline)
  .runWith(sink)

val slickFlow = SqsSource(queue, sqsSourceSettings).take(100)
  .map(_.getBody)
  .via(pipeline)
  .runWith(sink)

Edit: Well, besides the actorRef strategy, you can also use a Source.queue and produce your messages by calling queue.offer:

def source = Source
  .queue(Int.MaxValue, OverflowStrategy.backpressure)
  .map { name: String => s"hello, $name" }
  .toMat(BroadcastHub.sink[String])(Keep.both)
  .run()

def wsHandler(s: Source[String, NotUsed]): Flow[Message, Message, NotUsed] = Flow[Message]
  .mapConcat(_ => Nil)
  .merge(s)
  .map(TextMessage(_))

import scala.concurrent.duration._

val websocketRoute =
  path("greeter" / Segment) { name =>
    val (queue, s) = source

    Source
      .tick(
        initialDelay = 1 second,
        interval = 1 second,
        tick = None
      )
      .map { _ =>
        queue.offer(name)
      }
      .runWith(Sink.ignore)

    handleWebSocketMessages(wsHandler(s))
  }

External links:

Upvotes: 2

Jeffrey Chung
Jeffrey Chung

Reputation: 19527

One idea is to use Source.actorRef:

val (actor, source) = Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
  .toMat(BroadcastHub.sink[String])(Keep.both)
  .run()

val wsHandler: Flow[Message, Message, NotUsed] = Flow[Message]
  .mapConcat(_ => Nil)
  .merge(source)
  .map(l => TextMessage(l.toString))

Messages sent to the materialized ActorRef are emitted if there is downstream demand. If there is no downstream demand, the elements are buffered, and the provided overflow strategy is used if the buffer is full. Note that there is no backpressure with this approach. You can send messages from a Source, as well as arbitrary messages, to this actor:

Source(1 to 1000)
  .throttle(1, 1.second, 1, ThrottleMode.Shaping)
  .map(_.toString)
  .runForeach(msg => actor ! msg)

actor ! "bacon"
actor ! "ribeye"

Upvotes: 2

Related Questions