Reputation: 1904
I have an iterable that is generated by reading lines from a file. As I read the lines, I want to perform some filtering operations, but I also want to compute some metadata about the operation for logging. The source files are relatively large, so I would like to do this in a single scan of the data.
Toy example:
def getStrings(lst: Traversable[String]): Traversable[String] = {
val someStrings = lst.filter(_.length >= 6)
val stringCount = someStrings.foldLeft(0)((accum, line) => accum + 1)
println(stringCount)
someStrings
}
This example has the necessary functionality, but it performs two passes over the data structure: one to filter, and one to reduce. What I want to accomplish is some kind of "foldLeft" operation that computes a running tally, but also returns the original data structure. The idea being something like:
def getStrings(lst: Traversable[String]): Traversable[String] = {
val strings = lst.filter(_.length >= 6).smoothOperator(0)((accum, line) => {
if (line.isLast) {
println(accum)
} else {
accum + 1
}
})
strings
}
Ideally, the application of smoothOperator
would parallelize nicely just like any other map operation and would add to the accumulator as data goes through it, thus minimizing the number of iterations over the data structure.
Is this possible?
Upvotes: 1
Views: 234
Reputation: 418
// with immutable List
def foldAndFilter1[A, M](orig: Iterable[A])(p: A => Boolean)(mEmpty: M)(mf: (M, A) => M): (M, Iterable[A]) =
orig.foldLeft((mEmpty, List.empty[A])) { case ((meta, filtered), item) =>
(mf(meta, item), if (p(item)) item::filtered else filtered)
} match { case (meta, list) => (meta, list.reverse) }
// with mutable ListBuffer
def foldAndFilter2[A, M](orig: Iterable[A])(p: A => Boolean)(mEmpty: M)(mf: (M, A) => M): (M, Iterable[A]) =
orig.foldLeft((mEmpty, ListBuffer.empty[A])) { case ((meta, filtered), item) =>
(mf(meta, item), if (p(item)) filtered:+item else filtered)
}
val rs1: (Int, Iterable[Int]) = foldAndFilter1(1 to 10 toList)(n => n%2 == 0)(0)((m, _) => m+1)
val rs2: (Int, Iterable[Int]) = foldAndFilter2(1 to 10 toList)(n => n%2 == 0)(0)((m, _) => m+1)
foldRight
doesn't make any sense because can be used only on IndexedSeq
, and effective parallelization can be achieved only on IndexedSeq
to provide fast split operation
also can be expressed via Cats
, but you need to have a Monoid for your M type
import cats.{Applicative, Monoid, Traverse}
import cats.syntax.applicative._
def foldAndFilter3[F[_]: Traverse: Applicative, A, M](orig: F[A])(p: A => Boolean)(mf: (M, A) => M)(implicit fam: Monoid[F[A]], mm: Monoid[M]): (M, F[A]) =
Traverse[F].foldLeft(orig, (mm.empty, fam.empty)) { case ((meta, filtered), item) =>
(mf(meta, item), if (p(item)) fam.combine(filtered, item.pure[F]) else filtered )
}
import cats.instances.list._
import cats.instances.int._
val rs3: (Int, Iterable[Int]) = foldAndFilter3(1 to 10 toList)(n => n%2 == 0)((m:Int, _) => m+1)
but any generalization, on the one hand, leads to more readable, easy to write code, but on the other hand to less optimized in the particular case.
Upvotes: 1
Reputation: 27605
What you need is IMHO isomorphic to running foldLeft but on something that does several folds at once:
def doubleFoldLeft[A, B, C](traversable: Traversable[A], firstZero: B, secondZero: C)(
firstFold: (B, A) => B
)(
secondFold: (C, A) => C
): (B, C) = traversable.foldLeft(firstZero -> secondZero) {
case ((b, c), a) =>
firstFold(b, a) -> secondFold(c, a)
}
what you are asking exactly though, would require building it kind of dynamically, so that you would turn:
def zipWithFoldLeftUntilElement[A, B](
traversable: Traversable[A],
zero: B
)(fold: (B, A) => B): Traversable[(A, B)] = ...
with that you would still end up using fold at the end:
zipWithFoldLeftUntilElement[(myTraversable.filter(...), zero) {
...
}.foldLeft(anotherZero) { case ((a, b), c) =>
... // do sth with a and c
b -> c
} // (B, C) - tuple of 2 fold results
Long story short, if you want to consume stream once, but compute several things in parallel... just compute more than one thing in a .foldLeft
. If your logic is more complicated than that... I would consider using reactive streams like Akka Streams or FS2. If the logic is really messed up, I would try Akka Streams graphs.
Upvotes: 4
Reputation: 10834
Instead of folding the filtered list of strings, fold the original list and accumulate into a tuple. That tuple would accumulate the lines from the original list that match the filter criterion into one component and the result of the original accumulation function into the other component.
Upvotes: 0
Reputation: 1163
Here is my solution
object GetStringsTest extends App{
def getStringsOriginal(lst: Traversable[String]): Traversable[String] = {
val someStrings = lst.filter(_.length >= 6)
val stringCount = someStrings.foldLeft(0)((accum, line) => accum + 1)
println(stringCount)
someStrings
}
def getStringsOnePass(lst: Traversable[String]): Traversable[String] = {
val folded = lst.foldRight((IndexedSeq[String](), 0)){ (e, acc) =>
if (e.length >= 6) (e +: acc._1, acc._2 + 1)
else acc
}
println(folded._2)
folded._1
}
val myList = List("hi", "defenestration", "supercilious", "football", "tea")
println(getStringsOriginal(myList))
println(getStringsOnePass(myList))
}
I used foldRight just in case you want to swap out IndexedSeq with List. If you are building up a List, you need to use foldRight instead of foldLeft.
My solution returns a sequence of the correct type, but it may be of a different type than the input Traversable. The output will always be of type IndexedSeq[String]. If the input is of type List[String], the output will be of a different type.
Upvotes: 2