Reputation: 1095
I have an S3 structure that's the result of a Spark job that writes partitioned CSV files like below.
bucketA
output
cleaned-data1
part000....csv
part001....csv
part002....csv
cleaned-data2
.....
What I need is to be able have an Akka HTTP endpoint that points to the output file name to download all parts as a zip file: https://..../download/cleaned-data1
.
When this endpoint is called, ideally I want to:
Open a zip stream from the server to the client browser
Open up the part files and stream the bytes into the zip stream directly to the client without any buffering on the server to avoid memory issue
The total size of all parts can get up to 30GB uncompressed.
Is there a way to do this through Akka Stream, Akka HTTP or Play? Can I utilize the Alpakka library?
Edited temporary based on Ramon's answer:
def bucketNameToFileContents(bucket : String) : Source[ByteString, _] =
bucketNameToKeySource(bucket)
.map(key => S3.download(bucket, key))
.map(x => x.map(y => y.fold(Source.empty[ByteString])(_._1)))
.flatMapConcat(identity)
.flatMapConcat(identity)
Upvotes: 0
Views: 1050
Reputation: 17973
The first step is to create an akka
stream Source
of the bucket contents:
type Key = String
def bucketNameToKeySource(bucket : String) : Source[Key, _] =
S3.listBucket(bucket, None)
.map(_.key)
This can now be combined with the S3 download capabilities and flatMapConcat
:
def bucketNameToFileContents(bucket : String) : Source[ByteString, _] =
bucketNameToKeySource(bucket)
.map(key => S3.download(bucket, key))
.map(_.getOrElse(Source.empty[ByteString])
.flatMapConcat(identity)
This function can now be incorporated into your Route
. The question asks for "open a zip stream from the server to the client" so encodeResponse
is used:
def bucketNameToRoute(parentBucketName : String) : Route =
encodeResponse {
path ("download" / Segment) { childBucketName =>
val bucketName = parentBucketName + "/" + childBucketName
val byteStrSource = bucketNameToFileContents(bucketName)
complete(OK -> byteStrSource)
}
}
Upvotes: 1