valenjimmo
valenjimmo

Reputation: 13

Scala Iterator/Looping Technique - Large Collections

I have really large tab delimited files (10GB-70GB) and need to do some read, data manipulation, and write to a separate file. The files can range from 100 to 10K columns with 2 million to 5 million rows.

The first x columns are static which are required for reference. Sample file format:

#ProductName  Brand    Customer1  Customer2  Customer3
Corolla       Toyota   Y          N          Y
Accord        Honda    Y          Y          N
Civic         Honda    0          1          1

I need to use the first 2 columns to get a product id then generate an output file similar to:

ProductID1 Customer1 Y
ProductID1 Customer2 N
ProductID1 Customer3 Y
ProductID2 Customer1 Y
ProductID2 Customer2 Y
ProductID2 Customer3 N
ProductID3 Customer1 N
ProductID3 Customer2 Y
ProductID3 Customer3 Y

Current sample code:

val fileNameAbsPath = filePath + fileName
val outputFile = new PrintWriter(filePath+outputFileName)
var customerList = Array[String]()

for(line <- scala.io.Source.fromFile(fileNameAbsPath).getLines()) {
    if(line.startsWith("#")) {
        customerList = line.split("\t")
    }
    else {
        val cols = line.split("\t")

        val productid = getProductID(cols(0), cols(1))
        for (i <- (2 until cols.length)) {
            val rowOutput = productid + "\t" + customerList(i) + "\t" + parser(cols(i))

            outputFile.println(rowOutput)
            outputFile.flush()
        }
    }
}
outputFile.close()

One of tests that I ran took about 12 hours to read a file (70GB) that has 3 million rows and 2500 columns. The final output file generated 250GB with about 800+ million rows.

My question is: is there anything in Scala other than what I'm already doing that can offer quicker performance?

Upvotes: 0

Views: 67

Answers (1)

Dima
Dima

Reputation: 40508

Ok, some ideas ...

  • As mentioned in the comments, you don't want to flush after every line. So, yeah, get rid of it.
  • Moreover, PrintWriter by default flushes after every newline anyway (so, currently, you are actually flushing twice :)). Use a two-argument constructor, when creating PrintWriter, and make sure the second parameter is false
  • You don't need to create BufferedWriter explicitly, PrintWriter is already buffering by default. The default buffer size is 8K, you might want to try to play around with it, but it will probably not make any difference, because, last I checked, the underlying FileOutputStream ignores all that, and flushes kilobyte-sized chunks either way.
  • Get rid of gluing rows together in a variable, and just write each field straight to the output.
  • If you do not care about the order in which lines appear in the output, you can trivially parallelize the processing (if you do care about the order, you still can, just a little bit less trivially), and write several files at once. That would help tremendously, if you place your output chunks on different disks and/or if you have multiple cores to run this code. You'd need to rewrite your code in (real) scala to make it thread safe, but that should be easy enough.
  • Compress data as it is being written. Use GZipOutputStream for example. That not only lets you reduce the physical amount of data actually hitting the disks, but also allows for a much larger buffer
  • Check out what your parser thingy is doing. You didn't show the implementation, but something tells me it is likely not free.
  • split can get prohibitively expensive on huge strings. People often forget, that its parameter is actually a regular expression. You are probably better off writing a custom iterator or just using good-old StringTokenizer to parse the fields out as you go, rather than splitting up-front. At the very least, it'll save you one extra scan per line.

Finally, last, but by no measure least. Consider using spark and hdfs. This kind of problems is the very area where those tools really excel.

Upvotes: 1

Related Questions