spaudanjo
spaudanjo

Reputation: 764

Akka Streams: Concatenating streams that initialise resources

I have two streams: A and B.

Only AFTER A is done with processing and writing its data to the file, I want to start with B.

I tried to concat the two streams with:

A.concat(
  Source.lazily { () =>
    println("B is getting initialised")
    getStreamForB()
  }
)

But this is already initialising B BEFORE A has finished.

Upvotes: 0

Views: 228

Answers (2)

The FileIO.toPath method will materialize the stream into a Future[IOResult]. If you are working with stream A that is writing to a file:

val someDataSource : Source[ByteString, _] = ???

val filePath : Path = ???

val fileWriteOptions : Set[OpenOption] = ???

val A : Future[IOResult] = 
  someDataSource
    .to(FileIO.toPath(filePath, fileWriteOptions))
    .run()

You can use the materialized Future to kick off your stream B once the writing is completed:

val fileReadOptions : Set[OpenOption] = ???

val someProcessingWithTheDataOfB : Sink[ByteString, _] = ???

A foreach { _ =>

  val B : Future[IOResult] = 
    FileIO
      .fromPath(filePath, fileReadOptions)
      .to(someProcessingWithTheDataOfB)
      .run()
}

Similarly, you could do some testing of the IOResult before doing the reading to make sure there were no failures during the writing process:

A.filter(ioResult => ioResult.status.isSuccess)
 .foreach { _ =>
   val B : Future[IOResult] = 
     FileIO
       .fromPath(filePath, readOptions)
       .to(someProcessingOfB)
       .run()
 }

Upvotes: 1

Jeffrey Chung
Jeffrey Chung

Reputation: 19517

There is a ticket tracking the fact that Source#concat does not support lazy materialization. That ticket mentions the following work-around:

implicit class SourceLazyOps[E, M](val src: Source[E, M]) {
  def concatLazy[M1](src2: => Source[E, M1]): Source[E, NotUsed] =
    Source(List(() => src, () => src2)).flatMapConcat(_())
}

Applying the above implicit class to your case:

A.concatLazy(
  Source.lazily { () =>
    println("B is getting initialised")
    getStreamForB()
  }
)

Upvotes: 4

Related Questions