Reputation: 15345
I'm trying out Akka Streams and here is a short snippet that I have:
override def main(args: Array[String]) {
val filePath = "/Users/joe/Softwares/data/FoodFacts.csv"//args(0)
val file = new File(filePath)
println(file.getAbsolutePath)
// read 1MB of file as a stream
val fileSource = SynchronousFileSource(file, 1 * 1024 * 1024)
val shaFlow = fileSource.map(chunk => {
println(s"the string obtained is ${chunk.toString}")
})
shaFlow.to(Sink.foreach(println(_))).run // fails with a null pointer
def sha256(s: String) = {
val messageDigest = MessageDigest.getInstance("SHA-256")
messageDigest.digest(s.getBytes("UTF-8"))
}
}
When I ran this snippet, I get:
Exception in thread "main" java.lang.NullPointerException
at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:365)
at com.test.api.consumer.DataScienceBoot$.main(DataScienceBoot.scala:30)
at com.test.api.consumer.DataScienceBoot.main(DataScienceBoot.scala)
It seems to me that it is not fileSource is just empty? Why is this? Any ideas? The FoodFacts.csv if 40MB in size and all I'm trying to do is to create a 1MB stream of data!
Even using the defaultChunkSize of 8192 did not work!
Upvotes: 3
Views: 3114
Reputation: 31724
Well 1.0
is deprecated. And if you can, use 2.x
.
When I try with 2.0.1
version by using FileIO.fromFile(file)
instead of SynchronousFileSource
, it is a compile failure with message fails with null pointer
. This was simply because it didnt have ActorMaterializer
in scope. Including it, makes it work:
object TImpl extends App {
import java.io.File
implicit val system = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
val file = new File("somefile.csv")
val fileSource = FileIO.fromFile(file,1 * 1024 * 1024 )
val shaFlow: Source[String, Future[Long]] = fileSource.map(chunk => {
s"the string obtained is ${chunk.toString()}"
})
shaFlow.runForeach(println(_))
}
This works for file of any size. For more information on configuration of dispatcher, refer here.
Upvotes: 4