alt-f4
alt-f4

Reputation: 2306

Does every part of my Akka-http stream have to be producing output with the type Message?

I am trying to consume a stream via WebSockets, and I would like to parse the Messages received and transform them before they reach the Sink. However, I keep getting errors whenever I use a Source or a Sink that does not take in Message as input.

According to the documentation:

Therefore a WebSocket connection is modelled as either something you connect a Flow[Message, Message, Mat] to or a Flow[Message, Message, Mat] that you connect a Source[Message, Mat] and a Sink[Message, Mat] to.

I am still not sure if I am getting this correctly. My confusion is: do Sources, Flows & Sinks using Akka-http websockets have to always pass around the type Message? Is there a way around it? And most importantly, what is the best practice here?

I formulated a simplified snippet of my code (that is not intended to be runnable) but rather should help conceptualizing my question.

val outgoing = Source.maybe[Message]

val decoder = Flow[Message] map {x => TextMessage("Hello from decode")}

// Do I need to pass a Message here?
val wrongDecoder = Flow[String] map {x => "Help :( I can't Sink! Maybe because I'm String?"}

val sink: Sink[Message, Future[Done]] = Sink.foreach[Message] {case message: TextMessage.Strict => message.text}

val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest(uri))

val (upgradeResponse, closed) =
  outgoing
    .viaMat(webSocketFlow)(Keep.right)
    .viaMat(decoder)(Keep.left)
    .viaMat(wrongDecoder)(Keep.left) // IDE compiler tells me it expected a Graph but found a Flow?
    .toMat(sink)(Keep.both)
    .run()

Upvotes: 0

Views: 87

Answers (1)

Levi Ramsey
Levi Ramsey

Reputation: 20551

Yes, a flow would have to take and emit Messages.

val source: Source[Message, SoMat] = ???

val flow: Flow[Message, Message, FMat] = ???

val sink: Sink[Message, SiMat] = ???

source.via(flow).runWith(sink)  // Ignoring which materializations you'd actually want to keep

This seems restrictive, but note some signatures:

// in Source (with types at least partially expanded)
def map[T](f: Message => T): Source[T, SoMat]

// in Flow (with types at least partially expanded)
def map[T](f: Message => T): Flow[Message, T, FMat]

// in Sink (with types at least partially expanded)
def contramap[T](f: T => Message): Sink[T, SiMap]

i.e. you can take a Source[Message] and map it into a Source of any type you choose if you have a function from Message to that type. Likewise, you can take a Sink[Message] and contramap it into a Sink of any type. And for a Flow, you can map it into any type and map it back into Message.

For example, you could

val outgoing = Source.maybe[Message]
val decoder: Flow[Message, String, NotUsed] = Flow[Message].mapConcat { m =>
  m match {
    case TextMessage.Strict(msg) => List(msg)
    case _ => Nil
  }
}

val stringProcessor: Flow[String, String, NotUsed] = Flow[String].map { s => s.replace(':', ';') }

val sink: Sink[String, Future[Done]] = Sink.foreach[Message] {
  case TextMessage.Strict(text) => println(text)
}.contramap[String] { s => TextMessage.Strict(s) }

val websocketFlow = Http().webSocketClientFlow(WebSocketRequest(uri))

val (upgradeResponse, closed) =
  outgoing
    .viaMat(websocketFlow)(Keep.right)
    .viaMat(decoder)(Keep.left)
    .viaMat(stringProcessor)(Keep.left)
    .toMat(sink)(Keep.both)
    .run()

I would probably combine the websocketFlow and decoder into one flow from Message to String (or whatever domain type applies for the business logic (stringProcessor in this case)), using a contramapped Sink as above.

val stringsFromWebsocket: Flow[Message, String, Future[WebsocketUpgradeResponse]] =
  Http()
    .webSocketClientFlow(WebSocketRequest(uri))
    .viaMat(decoder)(Keep.left)

val (upgradeResponse, closed) =
  outgoing
    .viaMat(stringsFromWebsocket)(Keep.right)
    .viaMat(stringProcessor)(Keep.left)
    .toMat(sink)(Keep.both)
    .run()

Upvotes: 1

Related Questions