joesan
joesan

Reputation: 15345

Reading a large file using Akka Streams

I'm trying out Akka Streams and here is a short snippet that I have:

  override def main(args: Array[String]) {
    val filePath = "/Users/joe/Softwares/data/FoodFacts.csv"//args(0)

    val file = new File(filePath)
    println(file.getAbsolutePath)
    // read 1MB of file as a stream
    val fileSource = SynchronousFileSource(file, 1 * 1024 * 1024)
    val shaFlow = fileSource.map(chunk => {
      println(s"the string obtained is ${chunk.toString}")
    })
    shaFlow.to(Sink.foreach(println(_))).run // fails with a null pointer

    def sha256(s: String) = {
      val  messageDigest = MessageDigest.getInstance("SHA-256")
      messageDigest.digest(s.getBytes("UTF-8"))
    }
  }

When I ran this snippet, I get:

Exception in thread "main" java.lang.NullPointerException
    at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:365)
    at com.test.api.consumer.DataScienceBoot$.main(DataScienceBoot.scala:30)
    at com.test.api.consumer.DataScienceBoot.main(DataScienceBoot.scala)

It seems to me that it is not fileSource is just empty? Why is this? Any ideas? The FoodFacts.csv if 40MB in size and all I'm trying to do is to create a 1MB stream of data!

Even using the defaultChunkSize of 8192 did not work!

Upvotes: 3

Views: 3114

Answers (1)

Jatin
Jatin

Reputation: 31724

Well 1.0 is deprecated. And if you can, use 2.x.

When I try with 2.0.1 version by using FileIO.fromFile(file) instead of SynchronousFileSource, it is a compile failure with message fails with null pointer. This was simply because it didnt have ActorMaterializer in scope. Including it, makes it work:

object TImpl extends App {
import java.io.File

  implicit val system = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()

  val file = new File("somefile.csv")
  val fileSource = FileIO.fromFile(file,1 * 1024 * 1024 )
  val shaFlow: Source[String, Future[Long]] = fileSource.map(chunk => {
    s"the string obtained is ${chunk.toString()}"
  })

  shaFlow.runForeach(println(_))    
}

This works for file of any size. For more information on configuration of dispatcher, refer here.

Upvotes: 4

Related Questions