DarqMoth
DarqMoth

Reputation: 603

Scalding: Compare strings pairwise?

With Scalding I need to:

  1. Group string fields by first 3 chars
  2. Compare strings in all pairs in every group using edit-distance metric ( http://en.wikipedia.org/wiki/Edit_distance)
  3. Write results in CSV file where record is string; string; distance

To group strings I use map and groupBy as in the following example:

import cascading.tuple.Fields
import com.twitter.scalding._

class Scan(args: Args) extends Job(args) {
  val output = TextLine("tmp/out.txt")

  val wordsList = List(
    ("aaaa"),
    ("aaabb"),
    ("aabbcc"),
    ("aaabccdd"),
    ("aaabbccdde"),
    ("aaabbddd"),
    ("bbbb"),
    ("bbbaaa"),
    ("bbaaabb"),
    ("bbbcccc"),
    ("bbbddde"),
    ("ccccc"),
    ("cccaaa"),
    ("ccccaabbb"),
    ("ccbbbddd"),
    ("cdddeee")
    )

  val orderedPipe =
    IterableSource[(String)](wordsList, ('word))
        .map('word -> 'key ){word:String => word.take(3)}
    .groupBy('key) {_.toList[String]('word -> 'x) }
        .debug
        .write(output)
}

As a result I get:

['aaa', 'List(aaabbddd, aaabbccdde, aaabccdd, aaabb, aaaa)']
['aab', 'List(aabbcc)']
['bba', 'List(bbaaabb)']
['bbb', 'List(bbbddde, bbbcccc, bbbaaa, bbbb)']
['ccb', 'List(ccbbbddd)']
['ccc', 'List(ccccaabbb, cccaaa, ccccc)']
['cdd', 'List(cdddeee)']

Now, in this example, I need to comute edit-distance for strings with aaa key in this list:

List(aaabbddd, aaabbccdde, aaabccdd, aaabb, aaaa)

next for all strings with 'bbb' key in this list:

List(bbbddde, bbbcccc, bbbaaa, bbbb)

etc.

To compute edit-distance between all strings in every group I need to replace toList with my own function, how can I do this? And also how can I write results of my function to a CSV file?

Thanks!

Update

How to get List from Scalding Pipe?

toList just returns another Pipe so I can't use it all:

  val orderedPipe =
    IterableSource[(String)](wordsList, ('word))
        .map('word -> 'key ){word:String => word.take(3)}
        .groupBy('key) {_.toList[String]('word -> 'x) }
        .combinations(2) //---ERROR! Pipe has no such method!
        .debug
        .write(output)

Upvotes: 1

Views: 591

Answers (1)

Shyamendra Solanki
Shyamendra Solanki

Reputation: 8851

The edit-distance can be calculated as described in wikipedia:

def editDistance(a: String, b: String): Int = {

    import scala.math.min

    def min3(x: Int, y: Int, z: Int) = min(min(x, y), z)

    val (m, n) = (a.length, b.length)

    val matrix = Array.fill(m + 1, n + 1)(0)

    for (i <- 0 to m; j <- 0 to n) {

        matrix(i)(j) = if (i == 0) j
                       else if (j == 0) i
                       else if (a(i-1) == b(j-1)) matrix(i-1)(j-1)
                       else min3(
                                 matrix(i - 1)(j) + 1,
                                 matrix(i)(j-1) + 1,
                                 matrix(i - 1)(j - 1) + 1) 
    }

    matrix(m)(n)
}

For finding pairwise edit-distance of elements of a list:

def editDistances(list: List[String]) = {

    list.combinations(2).toList.map(x => (x(0), x(1), editDistance(x(0), x(1))))
}

use this in groupBy:

  val orderedPipe =
      IterableSource[(String)](wordsList, ('word))
      .map('word -> 'key ){word:String => word.take(3)}
      .groupBy('key) {_.mapList[String, List[(String, String, Int)]]('word -> 'x)(editDistances)}
      .debug
      .write(output)    

As far as writing to csv format is concerned, you can simply use the com.twitter.scalding.Csv class.

write(Csv(outputFile))

Upvotes: 1

Related Questions