Michael K
Michael K

Reputation: 2288

Is it possible to combine multiple map and reduce functions into a single pass in Scala?

I have multiple map functions running over the same data and I'd like to have them run in a single pass. I'm looking for a generic way to do this.

val fruits: Seq[String] = Seq("apple", "banana", "cherry")

def mapF(s: String): Char = s.head
def reduceF(c1: Char, c2: Char): Char = if(c1 > c2) c1 else c2

def mapG(s: String): Int = s.length
def reduceG(i1: Int, i2: Int): Int = i1 + i2

val largestStartingChar = fruits.map(mapF).reduce(reduceF)
val totalStringLength = fruits.map(mapG).reduce(reduceG)

I'd like to reduce the number of passes over fruits. I can make this generic for two maps and reduces like this:

def productMapFunction[A, B, C](f: A=>B, g: A=>C): A => (B, C) = {
  x => (f(x), g(x))
}

def productReduceFunction[T, U](f: (T, T)=>T, g: (U, U) => U):
    ((T,U), (T,U)) => (T, U) = {
  (tu1, tu2) => (f(tu1._1, tu2._1), g(tu1._2, tu2._2))
}

val xMapFG = productMapFunction(mapF, mapG)
val xReduceFG = productReduceFunction(reduceF, reduceG)

val (largestStartingChar2, totalStringLength2) = 
  fruits.map(xMapFG).reduce(xReduceFG))

I'd like to do this even more generically, with an arbitrary number of map and reduce functions, but I'm not sure how to proceed, or if this is possible.

Upvotes: 5

Views: 643

Answers (3)

Yuriy Tumakha
Yuriy Tumakha

Reputation: 1570

Following solution uses Cats 2 and custom type MapReduce.

Reducing operation can be specified by function reduce: (O, O) => O or cats reducer: Semigroup[O]. Multiple MapReduce objects can be combined into one by Apply instance provided by implicit def mapReduceApply[I]

import cats._
import cats.implicits._

trait MapReduce[I, O] {
  type R

  def reducer: Semigroup[R]

  def map: I => R

  def mapResult: R => O

  def apply(input: Seq[I]): O = mapResult(input.map(map).reduce(reducer.combine))
}

object MapReduce {
  def apply[I, O, _R](_reducer: Semigroup[_R], _map: I => _R, _mapResult: _R => O): MapReduce[I, O] =
    new MapReduce[I, O] {
      override type R = _R

      override def reducer = _reducer

      override def map = _map

      override def mapResult = _mapResult
    }

  def apply[I, O](map: I => O)(implicit r: Semigroup[O]): MapReduce[I, O] =
    MapReduce[I, O, O](r, map, identity)

  def apply[I, O](map: I => O, reduce: (O, O) => O): MapReduce[I, O] = {
    val reducer = new Semigroup[O] {
      override def combine(x: O, y: O): O = reduce(x, y)
    }
    MapReduce(map)(reducer)
  }

  implicit def mapReduceApply[I] =
    new Apply[({type F[X] = MapReduce[I, X]})#F] {
      override def map[A, B](f: MapReduce[I, A])(fn: A => B): MapReduce[I, B] =
        MapReduce(f.reducer, f.map, f.mapResult.andThen(fn))

      override def ap[A, B](ff: MapReduce[I, (A) => B])(fa: MapReduce[I, A]): MapReduce[I, B] =
        MapReduce(ff.reducer product fa.reducer,
          i => (ff.map(i), fa.map(i)),
          (t: (ff.R, fa.R)) => ff.mapResult(t._1)(fa.mapResult(t._2))
        )
    }

}

object MultiMapReduce extends App {

  val fruits: Seq[String] = Seq("apple", "banana", "cherry")

  def mapF(s: String): Char = s.head

  def reduceF(c1: Char, c2: Char): Char = if (c1 > c2) c1 else c2

  val biggestFirsChar = MapReduce(mapF, reduceF)
  val totalChars = MapReduce[String, Int](_.length) // (Semigroup[Int]) reduce by _ + _
  def count[A] = MapReduce[A, Int](_ => 1)

  val multiMapReduce = (biggestFirsChar, totalChars, count[String]).mapN((_, _, _))
  println(multiMapReduce(fruits))

  val sum = MapReduce[Double, Double](identity)
  val average = (sum, count[Double]).mapN(_ / _)
  println(sum(List(1, 2, 3, 4)))
  println(average(List(1, 2, 3, 4)))

}

Runnable version is also available on GitHub.

Upvotes: 1

phipsgabler
phipsgabler

Reputation: 20950

I think you're just trying to reinvent transducers. It's been a while since I have used Scala, but there's at least one implementation.

Upvotes: 1

simpadjo
simpadjo

Reputation: 4017

Interesting question!

I don't know of any such implementation in the standard library or even scalaz/cats. It's not very surprising because if your list is not very large you can just perform map-reduces sequentially and I'm not even sure that overhead of constructing lots of intermediate objects would be smaller than overhead of traversing the list several times.

And if the list is potentially doesn't fit into memory you should be using one of the streaming libraries (fs2/zio-streams/akka-streams)

Although if your input was Iterator instead of List, such functionality would be useful.

There is an interesting article about this problem: https://softwaremill.com/beautiful-folds-in-scala/

tldr: Map-reduce workflow could be formalized as follows:

trait Fold[I, O] {
  type M
  def m: Monoid[M]

  def tally: I => M
  def summarize: M => O
}

In your case I = List[A], tally = list => list.map(mapF), summarize = list => list.reduce(reduceF) .

To run a map-reduce on a list using an instance of fold you need to run

fold.summarize(fold.tally(list))

You can define combine operation on them: def combine[I, O1, O2](f1: Fold[I, O1], f2: Fold[I, O2]): Fold[I, (O1, O2)]

Using combine few times would give you what you want:

combine(combine(f1, f2), f3): Fold[I, ((O1, O2), O3)]

Upvotes: 1

Related Questions