suriyanto
suriyanto

Reputation: 1095

Streaming download multiple files from S3 as zip through Akka HTTP or Play

I have an S3 structure that's the result of a Spark job that writes partitioned CSV files like below.

bucketA
  output
    cleaned-data1
      part000....csv
      part001....csv
      part002....csv
    cleaned-data2
      .....

What I need is to be able have an Akka HTTP endpoint that points to the output file name to download all parts as a zip file: https://..../download/cleaned-data1.

When this endpoint is called, ideally I want to:

  1. Open a zip stream from the server to the client browser

  2. Open up the part files and stream the bytes into the zip stream directly to the client without any buffering on the server to avoid memory issue

The total size of all parts can get up to 30GB uncompressed.

Is there a way to do this through Akka Stream, Akka HTTP or Play? Can I utilize the Alpakka library?

Edited temporary based on Ramon's answer:

  def bucketNameToFileContents(bucket : String) : Source[ByteString, _] =
    bucketNameToKeySource(bucket)
      .map(key => S3.download(bucket, key))
      .map(x => x.map(y => y.fold(Source.empty[ByteString])(_._1)))
      .flatMapConcat(identity)
      .flatMapConcat(identity)

Upvotes: 0

Views: 1050

Answers (1)

The first step is to create an akka stream Source of the bucket contents:

type Key = String

def bucketNameToKeySource(bucket : String) : Source[Key, _] = 
  S3.listBucket(bucket, None)
    .map(_.key)

This can now be combined with the S3 download capabilities and flatMapConcat:

def bucketNameToFileContents(bucket : String) : Source[ByteString, _] = 
  bucketNameToKeySource(bucket)
    .map(key => S3.download(bucket, key))
    .map(_.getOrElse(Source.empty[ByteString])
    .flatMapConcat(identity)

This function can now be incorporated into your Route. The question asks for "open a zip stream from the server to the client" so encodeResponse is used:

def bucketNameToRoute(parentBucketName : String) : Route = 
  encodeResponse {
    path ("download" / Segment) { childBucketName =>

      val bucketName = parentBucketName + "/" + childBucketName

      val byteStrSource = bucketNameToFileContents(bucketName)

      complete(OK -> byteStrSource)
    } 
  }

Upvotes: 1

Related Questions