Reputation: 154
I am working on the below stream processing system to grab frames from one source, process, and send to another. I'm using a combination of akka-streams
and akka-http
through their scapa api. The pipeline is very short but I can't seem to locate where the system decides to stop after precisely 100 requests to the endpoint.
object frameProcessor extends App {
implicit val system: ActorSystem = ActorSystem("VideoStreamProcessor")
val decider: Supervision.Decider = _ => Supervision.Restart
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContextExecutor = system.dispatcher
val http = Http(system)
val sourceConnectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] = http.outgoingConnection(sourceUri)
val byteFlow: Flow[HttpResponse, Future[ByteString], NotUsed] =
Flow[HttpResponse].map(_.entity.dataBytes.runFold(ByteString.empty)(_ ++ _))
Source.repeat(HttpRequest(uri = sourceUri))
.via(sourceConnectionFlow)
.via(byteFlow)
.map(postFrame)
.runWith(Sink.ignore)
.onComplete(_ => system.terminate())
def postFrame(imageBytes: Future[ByteString]): Unit = {
imageBytes.onComplete{
case Success(res) => system.log.info(s"post frame. ${res.length} bytes")
case Failure(_) => system.log.error("failed to post image!")
}
}
}
Fore reference, I'm using akka-streams
version 2.5.19 and akka-http
version 10.1.7. No error is thrown, no error codes on the source server where the frames come from, and the program exits with error code 0.
My application.conf
is as follows:
logging = "DEBUG"
Always 100 units processed.
Thanks!
Added logging to the stream like so
.onComplete{
case Success(res) => {
system.log.info(res.toString)
system.terminate()
}
case Failure(res) => {
system.log.error(res.getMessage)
system.terminate()
}
}
Received a connection reset exception but this is inconsistent. The stream completes with Done
.
Using .mapAsync(1)(postFrame)
I get the same Success(Done)
after precisely 100 requests. Additionally, when I check the nginx server access.log
and error.log
there are only 200
responses.
I had to modify postFrame
as follows to run mapAsync
def postFrame(imageBytes: Future[ByteString]): Future[Unit] = {
imageBytes.onComplete{
case Success(res) => system.log.info(s"post frame. ${res.length} bytes")
case Failure(_) => system.log.error("failed to post image!")
}
Future(Unit)
}
Upvotes: 3
Views: 564
Reputation: 154
I believe I have found the answer on on the Akka docs using delayed restarts with a backoff operator. Instead of sourcing direct from an unstable remote connection, I use RestartSource.withBackoff
and not RestartSource.onFailureWithBackoff
. The modified stream looks like;
val restartSource = RestartSource.withBackoff(
minBackoff = 100.milliseconds,
maxBackoff = 1.seconds,
randomFactor = 0.2
){ () =>
Source.single(HttpRequest(uri = sourceUri))
.via(sourceConnectionFlow)
.via(byteFlow)
.mapAsync(1)(postFrame)
}
restartSource
.runWith(Sink.ignore)
.onComplete{
x => {
println(x)
system.terminate()
}
}
I was not able to find the source of the problem but it seems this will work.
Upvotes: 2