Reputation: 27605
I want to upload file into S3 using Alpakka and at the same time parse it with Tika to obtain its MimeType.
I have 3 parts of graph at the moment:
val fileSource: Source[ByteString, Any] // comes from Akka-HTTP
val fileUpload: Sink[ByteString, Future[MultipartUploadResult]] // created by S3Client from Alpakka
val mimeTypeDetection: Sink[ByteString, Future[MediaType.Binary]] // my implementation using Apache Tika
I would like to obtain both results at one place, something like:
Future[(MultipartUploadResult, MediaType.Binary)]
I have no issue with broadcasting part:
val broadcast = builder.add(Broadcast[ByteString](2))
source ~> broadcast ~> fileUpload
broadcast ~> mimeTypeDetection
However I have a trouble composing Sinks. Methods I found in API and documentation assumes that either combined sinks are of the same type or that I am Zipping Flows, not Sinks.
What is suggested approach in such case?
Upvotes: 1
Views: 1324
Reputation: 9023
Two ways:
1) using alsoToMat
(easier, no GraphDSL, enough for your example)
val mat1: (Future[MultipartUploadResult], Future[Binary]) =
fileSource
.alsoToMat(fileUpload)(Keep.right)
.toMat(mimeTypeDetection)(Keep.both)
.run()
2) using GraphDSL with custom materialized values (more verbose, more flexible). More info on this in the docs)
val mat2: (Future[MultipartUploadResult], Future[Binary]) =
RunnableGraph.fromGraph(GraphDSL.create(fileUpload, mimeTypeDetection)((_, _)) { implicit builder =>
(fileUpload, mimeTypeDetection) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[ByteString](2))
fileSource ~> broadcast ~> fileUpload
broadcast ~> mimeTypeDetection
ClosedShape
}).run()
Upvotes: 5