laidback
laidback

Reputation: 555

Akka Stream write to multiple Files

As described in the Documentation it is possible to have reusable Components like a lineSink writing into a file.

I would like to know how to rewrite the lineSink component from the documentation to be able to fulfill the following scenario:

val source = Source(1 to 10)
val flow = Flow[Int].map(_.toString)

/** Rewrite of the lineSink to enable new files with content */
val fileSink = Sink[String, Future[IOResult]] = 
  Flow[String]
    .map(s => ByteString(s + "\n"))
    .toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)

/** After running the graph i want to have a new file for each item processed */
val graph = source.via(flow).to(sink)

Upvotes: 1

Views: 1094

Answers (1)

Stefano Bonetti
Stefano Bonetti

Reputation: 9023

You are going to need to materialize one new flow for each new incoming element. As the FileIO.toPath sink materializes to a Future, you can achieve this using mapAsync(with a sensible parallelism choice). See example below:

  val source = Source(1 to 10)
  val flow: Flow[Int, String, NotUsed] = Flow[Int].map(_.toString)

  val fileFlow: Flow[String, IOResult, NotUsed] =
    Flow[String].mapAsync(parallelism = 4){ s ⇒
      Source.single(ByteString(s)).runWith(FileIO.toPath(Paths.get(fileName)))
    }

  val fileSink: Sink[IOResult, Future[Done]] = Sink.foreach[IOResult]{println}

  val graph = source.via(flow).via(fileFlow).to(fileSink)

Note that you'll need to generate a proper fileName for each incoming element. You'll need to come up with a way to do that.

Upvotes: 3

Related Questions