Christopher Chiche
Christopher Chiche

Reputation: 15335

Kafka topic to websocket

I am trying to implement a setup where I have multiple web browsers open a websocket connection to my akka-http server in order to read all messages posted to a kafka topic.

so the stream of messages should go this way

kafka topic -> akka-http -> websocket connection 1 
                         -> websocket connection 2
                         -> websocket connection 3 

For now I have created a path for the websocket:

val route: Route = 
 path("ws") {
   handleWebSocketMessages(notificationWs)
 }

Then I have created a consumer for my kafka topic:

val consumerSettings = ConsumerSettings(system,
  new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val source = Consumer
  .plainSource(consumerSettings, Subscriptions.topics("topic1"))

And then finally I want to connect this source to the websocket in handleWebSocketMessages

def handleWebSocketMessages: Flow[Message, Message, Any] =
  Flow[Message].mapConcat {
    case tm: TextMessage =>
      TextMessage(source)::Nil
    case bm: BinaryMessage =>
      // ignore binary messages but drain content to avoid the stream being clogged
      bm.dataStream.runWith(Sink.ignore)
      Nil
  }

Here is the error I get when I try to use source in the TextMessage:

Error:(77, 9) overloaded method value apply with alternatives: (textStream: akka.stream.scaladsl.Source[String,Any])akka.http.scaladsl.model.ws.TextMessage (text: String)akka.http.scaladsl.model.ws.TextMessage.Strict cannot be applied to (akka.stream.scaladsl.Source[org.apache.kafka.clients.consumer.ConsumerRecord[Array[Byte],String],akka.kafka.scaladsl.Consumer.Control]) TextMessage(source)::Nil

I think I'm making numerous mistakes along the way but I would say that the most blocking part is the handleWebSocketMessages.

Upvotes: 3

Views: 1088

Answers (1)

n1r3
n1r3

Reputation: 8913

The first thing, is to understand that source is of type : Source[ConsumerRecord[K, V], Control]. So, it's not something that you could pass as an argument of a TextMessage.

Now, let's take the websocket's point of view:

  • An outgoing message is built for each message in the Kafka source. The message will be a TextMessage from a String transformation of the Kafka message.
  • For each incoming message, just println() it

So, the Flow can be seen as two components: the Source & the Sink.

val incomingMessages: Sink[Message, NotUsed] =
  Sink.foreach(println(_))

val outgoingMessages: Source[Message, NotUsed] =
  source
    .map { consumerRecord => TextMessage(consumerRecord.record.value) }

val handleWebSocketMessages: Flow[Message, Message, Any]  
  = Flow.fromSinkAndSource(incomingMessages, outgoingMessages)

Hope it helps.

Upvotes: 3

Related Questions