Reputation: 255
I have 2 csv files with sorted data: File 1: numbers sorted (~1GB) File 2: numbers sorted + extra data (~20GB)
I need to lookup all numbers from file 1 in file 2 and do some processing (numbers in file 2 that are not present in file 1 are skipped).
So far I have:
object MainQueue extends IOApp {
override def run(args: List[String]): IO[ExitCode] =
program[IO].compile.drain.as(ExitCode.Success)
def program[F[_]: Sync: ContextShift](): Stream[F, Unit] =
for {
number <- numberStream
record <- records
.through(parser())
.through(findRecord(number))
_ <- Stream.emit(println(s"$number <-> $record"))
} yield ()
def findRecord[F[_]](phone: Long): Pipe[F, Long, Long] =
_.dropWhile(r => {
println(s"Reading $r")
r < phone
}).head //halts the stream
def numberStream[F[_]](): Stream[F, Long] =
Stream(100L, 120L)
//TODO: make stream continue and not halt and restart
def records[F[_]: Sync: ContextShift](): Stream[F, String] =
Stream
.resource(Blocker[F])
.flatMap { bec =>
readAll[F](Paths.get("small.csv"), bec, 4096)
}
.through(text.utf8Decode)
.through(text.lines)
def parser[F[_]](): Pipe[F, String, Long] = ??? //parse
def writer[F[_]](): Pipe[F, Long, Unit] =
_.map(v => {
println(s"Found: $v")
})
}
Which prints:
Reading 50
Reading 100
100 <-> 100
Reading 50
Reading 100
Reading 120
120 <-> 120
Which means the 2nd stream restarts for each value in File 1, how do I keep the position last read and go from there? Numbers are sorted so no point started over. I am super new to scala and fs2 so an explanation of what I am misunderstanding would be much appreciated.
Thanks!
Upvotes: 3
Views: 1341
Reputation: 465
First of all you need to know, that
for {
number <- numberStream
record <- records
.through(parser())
.through(findRecord(number))
_ <- Stream.emit(println(s"$number <-> $record"))
} yield ()
is just a syntax-suger for
numberStream()
.flatMap(number => records
.through(parser())
.through(findRecord(number)).map(x => (x, number)))
.flatMap { case (record, number) => Stream.emit(println(s"$number <-> $record")) }
which means you evaluate an effect, in this case
records
.through(parser())
.through(findRecord(number)).map(x => (x, number))
, on each element of numberStream
.
To keep the last position of a pointer from File2 you can count consumed bytes, but you would still need to reopen connection to File2 for each number from File1.
Operation which you're trying to achieve is conditional zipping, so you should take a look at methods like fs2.Stream#zip
, fs2.Stream#zipAll
etc, it would help you to zip these records opening files only once.
Zip-like methods don't do exactly what you're looking for, but it's quite easy to implement requested functionality with fs2.Pull
, here is an example:
def zipToLeft[F[_] : RaiseThrowable, O1, O2](in1: Stream[F, O1], in2: Stream[F, O2])
(condition: (O1, O2) => Boolean): Stream[F, (O1, O2)] = {
def go(s1: Stream[F, O1], s2: Stream[F, O2]): Pull[F, (O1, O2), Unit] =
s1.pull.uncons1.flatMap {
case Some((hd1, tl1)) => s2.pull.uncons1.flatMap {
case Some((hd2, tl2)) => if (condition(hd1, hd2)) Pull.output1((hd1, hd2)) >> go(tl1, tl2)
else go(s1, tl2)
case None => Pull.raiseError[F](new RuntimeException)
}
case None =>Pull.done
}
go(in1, in2).stream
}
And you can use it this way:
result <- program[IO].compile.toList
_ <- IO(println(result))
} yield ExitCode.Success
def program[F[_] : Sync : ContextShift]() = zipToLeft(numberStream(), records()) { case (v1, v2) => v1 == v2._1 }
def numberStream[F[_]](): Stream[F, Long] =
Stream.emits(Vector(1, 3, 6, 7, 9))
def records[F[_] : Sync : ContextShift](): Stream[F, (Int, String)] =
Stream.emits(Vector.range(1, 10).map(i => (i, i.toString)))
Output: List((1,(1,1)), (3,(3,3)), (6,(6,6)), (7,(7,7)), (9,(9,9)))
Upvotes: 2