Mateusz Kubuszok
Mateusz Kubuszok

Reputation: 27605

How to combine 2 Sinks of a different type?

I want to upload file into S3 using Alpakka and at the same time parse it with Tika to obtain its MimeType.

I have 3 parts of graph at the moment:

val fileSource: Source[ByteString, Any] // comes from Akka-HTTP
val fileUpload: Sink[ByteString, Future[MultipartUploadResult]] // created by S3Client from Alpakka
val mimeTypeDetection: Sink[ByteString, Future[MediaType.Binary]] // my implementation using Apache Tika

I would like to obtain both results at one place, something like:

Future[(MultipartUploadResult, MediaType.Binary)]

I have no issue with broadcasting part:

val broadcast = builder.add(Broadcast[ByteString](2))

source ~> broadcast ~> fileUpload
          broadcast ~> mimeTypeDetection

However I have a trouble composing Sinks. Methods I found in API and documentation assumes that either combined sinks are of the same type or that I am Zipping Flows, not Sinks.

What is suggested approach in such case?

Upvotes: 1

Views: 1324

Answers (1)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

Two ways:

1) using alsoToMat (easier, no GraphDSL, enough for your example)

  val mat1: (Future[MultipartUploadResult], Future[Binary]) =
    fileSource
    .alsoToMat(fileUpload)(Keep.right)
    .toMat(mimeTypeDetection)(Keep.both)
    .run()

2) using GraphDSL with custom materialized values (more verbose, more flexible). More info on this in the docs)

  val mat2: (Future[MultipartUploadResult], Future[Binary]) = 
    RunnableGraph.fromGraph(GraphDSL.create(fileUpload, mimeTypeDetection)((_, _)) { implicit builder =>
      (fileUpload, mimeTypeDetection) =>
        import GraphDSL.Implicits._
        val broadcast = builder.add(Broadcast[ByteString](2))

        fileSource ~> broadcast ~> fileUpload
                      broadcast ~> mimeTypeDetection
        ClosedShape
    }).run()

Upvotes: 5

Related Questions