Reputation: 1
I've been exploring ZIO Streams and ZIO's concurrency model for a while. Recently, I started working on a toy project where I process a large file containing integers. Here's what I've done so far:
Now, I'm at the next stage: reading all these temporary files into streams, sorting their contents in-memory again, and writing the results to a new file. I'd like to enforce some memory limitations so that I can dive deeper into ZIO's stream framework. You can say I am stuck trying to implement k-way merge, in ZIO.
Here’s the code I have for creating a ZStream from a single file:
private def fileStream(file: Path): Task[Stream[Throwable, Int]] =
ZIO.succeed {
ZStream
.fromPath(file)
.via(ZPipeline.utf8Decode)
.via(ZPipeline.splitLines)
.mapZIO(line =>
ZIO
.attempt(line.trim.toInt)
.orElseFail(new IOException(s"Invalid line in file $file: $line"))
)
}
I can create a list of streams using ZIO.foreach:
ZIO.foreach(files)(fileStream)
However, I'm stuck on how to combine multiple ZStreams declaratively. My goal is to merge or combine these streams while respecting some memory constraints.
In theory, I was expecting some sort ofZStream#combine method to merge multiple streams, but I haven’t found a clear solution. How can I achieve this in ZIO Streams?
I tried using ZStream#fromIterable and ZStream#fromIteratorZIO constructors, however these are not combining the effects described (which is reading a line) so I don't think they are doing what I am intending to do.
Upvotes: 0
Views: 76