Reputation: 764
I have two streams: A and B.
Only AFTER A is done with processing and writing its data to the file, I want to start with B.
I tried to concat
the two streams with:
A.concat(
Source.lazily { () =>
println("B is getting initialised")
getStreamForB()
}
)
But this is already initialising B BEFORE A has finished.
Upvotes: 0
Views: 228
Reputation: 17933
The FileIO.toPath
method will materialize the stream into a Future[IOResult]
. If you are working with stream A
that is writing to a file:
val someDataSource : Source[ByteString, _] = ???
val filePath : Path = ???
val fileWriteOptions : Set[OpenOption] = ???
val A : Future[IOResult] =
someDataSource
.to(FileIO.toPath(filePath, fileWriteOptions))
.run()
You can use the materialized Future to kick off your stream B
once the writing is completed:
val fileReadOptions : Set[OpenOption] = ???
val someProcessingWithTheDataOfB : Sink[ByteString, _] = ???
A foreach { _ =>
val B : Future[IOResult] =
FileIO
.fromPath(filePath, fileReadOptions)
.to(someProcessingWithTheDataOfB)
.run()
}
Similarly, you could do some testing of the IOResult
before doing the reading to make sure there were no failures during the writing process:
A.filter(ioResult => ioResult.status.isSuccess)
.foreach { _ =>
val B : Future[IOResult] =
FileIO
.fromPath(filePath, readOptions)
.to(someProcessingOfB)
.run()
}
Upvotes: 1
Reputation: 19517
There is a ticket tracking the fact that Source#concat
does not support lazy materialization. That ticket mentions the following work-around:
implicit class SourceLazyOps[E, M](val src: Source[E, M]) {
def concatLazy[M1](src2: => Source[E, M1]): Source[E, NotUsed] =
Source(List(() => src, () => src2)).flatMapConcat(_())
}
Applying the above implicit class to your case:
A.concatLazy(
Source.lazily { () =>
println("B is getting initialised")
getStreamForB()
}
)
Upvotes: 4