Michael
Michael

Reputation: 10303

How to write process files and write the results in parallel in Scala?

This is a follow-up to my previous question.

Suppose I process my files in parallel. Now I would like to write the processing results to a file. Since the results do not fit in memory I cannot just wait until the processing of all files finish and then write the results. I have to do the processing and writing in parallel somehow.

For example: Suppose I have files with numbers. The file size is about 500M. The number of files is about 200. Each file fits in memory but all of them do not fit. Now I would like to write all even numbers found in these files to another file.

How to do that in Scala (with Futures, and Scala parallel collections)?

Upvotes: 2

Views: 2022

Answers (2)

Thomas Luechtefeld
Thomas Luechtefeld

Reputation: 1466

For those not familiar with akka:

import java.io.{File, PrintWriter}
import akka.actor.{Actor,ActorSystem,Props}

object AkkaWriterExample extends App{

  val outputPath : String = ???
  val system = ActorSystem("WriterSystem")
  val writer = system.actorOf(Props(new WriterActor(new File(outputPath))), name="writer")
  writer ! "this is a test"
  system.shutdown()
  system.awaitTermination()
}

class WriterActor(outFile: File) extends Actor {

  val writer = new PrintWriter(outFile)

  // this is how you implement an akka actor
  // plain scala actors look a bit different        
  def receive = {
    case str:String => println(str); writer.write(str);
  }

  override def postStop() = {
    writer.flush(); 
    writer.close();
  }
}

Upvotes: 1

drexin
drexin

Reputation: 24403

At some point you have to synchronize the writing. If you don't want to block the other threads one possibility is to use an actor to write the results to a file. This could look like this:

class FileWriterActor(path: String) extends Actor {

  val file = ... // init FileWriter

  // this is how you implement an akka actor
  // plain scala actors look a bit different        
  def receive = {
    case x: MyResult => file.write(x.toString)
  }

  override def postStop() = file.close()
}

// usage
val result = ... // calculation stuff
fileWriter ! result

Upvotes: 5

Related Questions