shagoon
shagoon

Reputation: 483

TimeoutException when consuming files from S3 with akka streams

I'm trying to consume a bunch of files from S3 in a streaming manner using akka streams:

S3.listBucket("<bucket>", Some("<common_prefix>"))
  .flatMapConcat { r => S3.download("<bucket>", r.key) }
  .mapConcat(_.toList)
  .flatMapConcat(_._1)
  .via(Compression.gunzip())
  .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
  .map(_.utf8String)
  .runForeach { x => println(x) }

Without increasing akka.http.host-connection-pool.response-entity-subscription-timeout I get

java.util.concurrent.TimeoutException: Response entity was not subscribed after 1 second. Make sure to read the response entity body or call discardBytes() on it. for the second file, just after printing the last line of the first file, when trying to access the first line of the second file.

I understand the nature of that exception. I don't understand why the request to the second file is already in progress, while the first file is still being processed. I guess, there's some buffering involved.

Any ideas how to get rid of that exception without having to increase akka.http.host-connection-pool.response-entity-subscription-timeout?

Upvotes: 1

Views: 613

Answers (1)

Sean Glover
Sean Glover

Reputation: 1786

Instead of merging the processing of downloaded files into one stream with flatMapConcat you could try materializing the stream within the outer stream and fully process it there before emitting your output downstream. Then you shouldn't begin downloading (and fully processing) the next object until you're ready.

Generally you want to avoid having too many stream materializations to reduce overhead, but I suspect that would be negligible for an app performing network I/O like this.

Let me know if something like this works: (warning: untested)

S3.listBucket("<bucket>", Some("<common_prefix>"))
  .mapAsync(1) { result =>
    val contents = S3.download("<bucket>", r.key)
      .via(Compression.gunzip())
      .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
      .map(_.utf8String)
      .to(Sink.seq)(Keep.right)
      .run()
    contents     
  }
  .mapConcat(identity)
  .runForeach { x => println(x) }

Upvotes: 0

Related Questions