Reputation: 34061
I forward the incoming messages from Kafka to a webserver via websocket client. The following code shows, how I am doing it:
import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.ws._
import akka.kafka.scaladsl.Consumer
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import com.typesafe.scalalogging.Logger
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import scala.concurrent.{Future, Promise}
final case class WsGraph(logger: Logger, sink: Sink[Message, Future[Done]])(implicit val system: ActorSystem) {
private implicit val materializer = ActorMaterializer()
private implicit val akka = system.settings.config.getConfig("akka.kafka.consumer")
private implicit val executor = system.dispatcher
private val consumerSetup = system.settings.config.getConfig("kafka.consumer.setup")
private val wsSetup = system.settings.config.getConfig("websocket.setup")
private val consumerSettings: ConsumerSettings[String, String] =
ConsumerSettings(akka, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(consumerSetup.getString("bootStrapServers"))
.withGroupId(consumerSetup.getString("groupId"))
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
private val kafkaAsSource: Source[Message, (Consumer.Control, Promise[Option[Message]])] = Consumer
.plainSource(
consumerSettings,
Subscriptions.topics(consumerSetup.getString("topics"))
)
.map(msg => TextMessage(msg.value()))
.concatMat(Source.maybe[Message])(Keep.both)
.mapAsync(Runtime.getRuntime.availableProcessors())(Future(_))
private val socketFlow: Flow[Message, Message, (Consumer.Control, Promise[Option[Message]])] =
Flow.fromSinkAndSourceMat(sink, kafkaAsSource)(Keep.right)
private val (upgradeResponse, (draining, _)) =
Http().singleWebSocketRequest(
WebSocketRequest(wsSetup.getString("server")),
socketFlow)
val create: Future[Either[String, Done]] = upgradeResponse.map { upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
logger.info("Switching protocols")
Right(Done)
} else {
Left(s"Connection failed: ${upgrade.response.status}")
}
}
sys.addShutdownHook {
draining.shutdown()
logger.info("Draining websocket ressource.")
}
}
The problem here is, if the webserver can not be reached, the actor above gets closed. The question is, how to figure out, if the webserver is not more available, then the actor should restart and try to connect again.
Upvotes: 0
Views: 37
Reputation: 641
I think your code
private val (upgradeResponse, (draining, _)) =
Http().singleWebSocketRequest(
WebSocketRequest(wsSetup.getString("server")),
socketFlow)
has return type
(Future[WebSocketUpgradeResponse], T)
As you are only using upgradeResponse
i.e Future[WebSocketUpgradeResponse]
May be you can try rewriting your code using Recover with Retries
So you have to replace your
val create: Future[Either[String, Done]] = upgradeResponse.map { upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
logger.info("Switching protocols")
Right(Done)
} else {
Left(s"Connection failed: ${upgrade.response.status}")
}
}
with
planB = Source.empty
Source.fromFuture(upgradeResponse).recoverWithRetries(3, {
case ex: RuntimeException => logger.error("Error", ex); planB
}).runWith(Sink.ignore).map {upgrade =>
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
logger.info("Switching protocols")
Right(Done)
} else {
Left(s"Connection failed: ${upgrade.response.status}")
}
}
Here you can add exception handling for the RuntimeException
Please refer https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-error.html for more details
I hope this will help. Please let me know in case any error. Thanks
Upvotes: 2