softshipper
softshipper

Reputation: 34061

How figure out, if the webserver is still active

I forward the incoming messages from Kafka to a webserver via websocket client. The following code shows, how I am doing it:

import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws._
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import com.typesafe.scalalogging.Logger
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer

import scala.concurrent.{Future, Promise}

final case class WsGraph(logger: Logger, sink: Sink[Message, Future[Done]])(implicit val system: ActorSystem) {


  private implicit val materializer = ActorMaterializer()
  private implicit val akka = system.settings.config.getConfig("akka.kafka.consumer")
  private implicit val executor = system.dispatcher
  private val consumerSetup = system.settings.config.getConfig("kafka.consumer.setup")
  private val wsSetup = system.settings.config.getConfig("websocket.setup")

  private val consumerSettings: ConsumerSettings[String, String] =
    ConsumerSettings(akka, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(consumerSetup.getString("bootStrapServers"))
      .withGroupId(consumerSetup.getString("groupId"))
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

  private val kafkaAsSource: Source[Message, (Consumer.Control, Promise[Option[Message]])] = Consumer
    .plainSource(
      consumerSettings,
      Subscriptions.topics(consumerSetup.getString("topics"))
    )
    .map(msg => TextMessage(msg.value()))
    .concatMat(Source.maybe[Message])(Keep.both)
    .mapAsync(Runtime.getRuntime.availableProcessors())(Future(_))


  private val socketFlow: Flow[Message, Message, (Consumer.Control, Promise[Option[Message]])] =
    Flow.fromSinkAndSourceMat(sink, kafkaAsSource)(Keep.right)


  private val (upgradeResponse, (draining, _)) =
    Http().singleWebSocketRequest(
      WebSocketRequest(wsSetup.getString("server")),
      socketFlow)

  val create: Future[Either[String, Done]] = upgradeResponse.map { upgrade =>
    // just like a regular http request we can access response status which is available via upgrade.response.status
    // status code 101 (Switching Protocols) indicates that server support WebSockets
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
      logger.info("Switching protocols")
      Right(Done)
    } else {
      Left(s"Connection failed: ${upgrade.response.status}")
    }
  }

  sys.addShutdownHook {
    draining.shutdown()
    logger.info("Draining websocket ressource.")
  }

} 

The problem here is, if the webserver can not be reached, the actor above gets closed. The question is, how to figure out, if the webserver is not more available, then the actor should restart and try to connect again.

Upvotes: 0

Views: 37

Answers (1)

Shanti Swarup Tunga
Shanti Swarup Tunga

Reputation: 641

I think your code

private val (upgradeResponse, (draining, _)) =
Http().singleWebSocketRequest(
  WebSocketRequest(wsSetup.getString("server")),
  socketFlow)

has return type

(Future[WebSocketUpgradeResponse], T)

As you are only using upgradeResponse i.e Future[WebSocketUpgradeResponse]

May be you can try rewriting your code using Recover with Retries

So you have to replace your

val create: Future[Either[String, Done]] = upgradeResponse.map { upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
  logger.info("Switching protocols")
  Right(Done)
} else {
  Left(s"Connection failed: ${upgrade.response.status}")
}
}

with

planB = Source.empty
Source.fromFuture(upgradeResponse).recoverWithRetries(3, {
   case ex: RuntimeException => logger.error("Error", ex); planB
}).runWith(Sink.ignore).map {upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
  logger.info("Switching protocols")
  Right(Done)
} else {
  Left(s"Connection failed: ${upgrade.response.status}")
}       
}

Here you can add exception handling for the RuntimeException Please refer https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-error.html for more details

I hope this will help. Please let me know in case any error. Thanks

Upvotes: 2

Related Questions