Reputation: 17704
I have an akka stream from a web-socket like akka stream consume web socket and would like to build a reusable graph stage (inlet
: the stream, FlowShape
: add an additional field to the JSON specifying origin i.e.
and an outlet
to kafka.
I face the following 3 problems:
from the web socket flowThe sample Project (flow only) looks like:
import system.dispatcher
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val incoming: Sink[Message, Future[Done]] =
Flow[Message].mapAsync(4) {
case message: TextMessage.Strict =>
case message: TextMessage.Streamed =>
case message: BinaryMessage =>
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
val outgoing = Source.single(TextMessage("{\"op\":\"unconfirmed_sub\"}")).concatMat(Source.maybe)(Keep.right)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://"))
val ((completionPromise, upgradeResponse), closed) =
// TODO not working integrating kafka here
// .map(_.toString)
// .map { elem =>
// println(s"PlainSinkProducer produce: ${elem}")
// new ProducerRecord[Array[Byte], String]("topic1", elem)
// }
// .runWith(Producer.plainSink(producerSettings))
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
// kafka that works / writes dummy data
val done1 = Source(1 to 100)
.map { elem =>
println(s"PlainSinkProducer produce: ${elem}")
new ProducerRecord[Array[Byte], String]("topic1", elem)
Upvotes: 0
Views: 488
Reputation: 9023
One issue is around the incoming
stage, which is modelled as a Sink
. where it should be modelled as a Flow
. to subsequently feed messages into Kafka.
Because incoming text messages can be Streamed
. you can use flatMapMerge
combinator as follows to avoid the need to store entire (potentially big) messages in memory:
val incoming: Flow[Message, String, NotUsed] = Flow[Message].mapAsync(4) {
case msg: BinaryMessage =>
case TextMessage.Streamed(src) =>
src.runFold("")(_ + _).map { msg => Some(msg) }
}.collect {
case Some(msg) => msg
At this point you got something that produces strings, and can be connected to Kafka:
val addOrigin: Flow[String, String, NotUsed] = ???
val ((completionPromise, upgradeResponse), closed) =
.map { elem =>
println(s"PlainSinkProducer produce: ${elem}")
new ProducerRecord[Array[Byte], String]("topic1", elem)
Upvotes: 2