Epsilon
Epsilon

Reputation: 73

Spark Code Optimization

My task is to write a code that reads a big file (doesn't fit into memory) reverse it and output most five frequent words .
i have written the code below and it does the job .

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

 object ReverseFile {
  def main(args: Array[String]) {


    val conf = new SparkConf().setAppName("Reverse File")
    conf.set("spark.hadoop.validateOutputSpecs", "false")
    val sc = new SparkContext(conf)
    val txtFile = "path/README_mid.md"
    val txtData = sc.textFile(txtFile)
    txtData.cache()

    val tmp = txtData.map(l => l.reverse).zipWithIndex().map{ case(x,y) => (y,x)}.sortByKey(ascending = false).map{ case(u,v) => v}

    tmp.coalesce(1,true).saveAsTextFile("path/out.md")

    val txtOut = "path/out.md"
    val txtOutData = sc.textFile(txtOut)
    txtOutData.cache()

    val wcData = txtOutData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).map(item => item.swap).sortByKey(ascending = false)
    wcData.collect().take(5).foreach(println)


  }
}

The problem is that i'm new to spark and scala, and as you can see in the code first i read the file reverse it save it then reads it reversed and output the five most frequent words .

Upvotes: 1

Views: 438

Answers (2)

zero323
zero323

Reputation: 330423

The most expensive part of your code is sorting so the obvious improvement is to remove it. It is relatively simple in the second case where full sort is completely obsolete:

val wcData = txtData
  .flatMap(_.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _) // No need to swap or sort

// Use top method and explicit ordering in place of swap / sortByKey
val wcData = top(5)(scala.math.Ordering.by[(String, Int), Int](_._2))

Reversing order of lines is a little bit trickier. First lets reorder elements per partition:

val reversedPartitions = txtData.mapPartitions(_.toList.reverse.toIterator)

Now you have two options

  • use custom partitioner

    class ReversePartitioner(n: Int) extends Partitioner {
      def numPartitions: Int = n
      def getPartition(key: Any): Int = {
        val k = key.asInstanceOf[Int]
        return numPartitions - 1 - k
      }
    }
    
    val partitioner = new ReversePartitioner(reversedPartitions.partitions.size)
    
    val reversed = reversedPartitions
      // Add current partition number
      .mapPartitionsWithIndex((i, iter) => Iterator((i, iter.toList)))
      // Repartition to get reversed order
      .partitionBy(partitioner)
      // Drop partition numbers
      .values
      // Reshape
      .flatMap(identity)
    

    It still requires shuffling but it is relatively portable and data is still accessible in memory.

  • if all you want is to save reversed data you can call saveAsTextFile on reversedPartitions and reorder output files logically. Since part-n name format identifies source partitions all you have to do is to rename part-n to part-(number-of-partitions - 1 -n). It requires saving data so it is not exactly optimal but if you for example use in-memory file system can be a pretty good solution.

Upvotes: 2

Reactormonk
Reactormonk

Reputation: 21730

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object ReverseFile {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Reverse File")
    conf.set("spark.hadoop.validateOutputSpecs", "false")
    val sc = new SparkContext(conf)
    val txtFile = "path/README_mid.md"
    val txtData = sc.textFile(txtFile)
    txtData.cache()

    val reversed = txtData
      .zipWithIndex()
      .map(_.swap)
      .sortByKey(ascending = false)
      .map(_._2) // No need to deconstruct the tuple.

    // No need for the coalesce, spark should do that by itself.
    reversed.saveAsTextFile("path/reversed.md")

    // Reuse txtData here.
    val wcData = txtData
      .flatMap(_.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
      .map(_.swap)
      .sortByKey(ascending = false)

    wcData
      .take(5) // Take already collects.
      .foreach(println)
  }
}

Always do the collect() last, so Spark can evaluate things on the cluster.

Upvotes: 2

Related Questions