user3248346
user3248346

Reputation:

Reading a CSV files using Akka Streams

I'm reading a csv file. I am using Akka Streams to do this so that I can create a graph of actions to perform on each line. I've got the following toy example up and running.

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("MyAkkaSystem")
    implicit val materializer = ActorMaterializer()

        val source = akka.stream.scaladsl.Source.fromIterator(Source.fromFile("a.csv").getLines)
        val sink = Sink.foreach(println)
        source.runWith(sink)
      }

The two Source types don't sit easy with me. Is this idiomatic or is there is a better way to write this?

Upvotes: 15

Views: 14410

Answers (4)

Dmitry Kaltovich
Dmitry Kaltovich

Reputation: 2270

Try this:

import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString

import scala.concurrent.Await
import scala.concurrent.duration._

object ReadStreamApp extends App {
  implicit val actorSystem = ActorSystem()
  import actorSystem.dispatcher
  implicit val flowMaterializer = ActorMaterializer()

  val logFile = Paths.get("src/main/resources/a.csv")

  val source = FileIO.fromPath(logFile)

  val flow = Framing
    .delimiter(ByteString(System.lineSeparator()), maximumFrameLength = 512, allowTruncation = true)
    .map(_.utf8String)

  val sink = Sink.foreach(println)

  source
    .via(flow)
    .runWith(sink)
    .andThen {
      case _ =>
        actorSystem.terminate()
        Await.ready(actorSystem.whenTerminated, 1 minute)
    }
}

Upvotes: 7

Jeffrey Chung
Jeffrey Chung

Reputation: 19497

The idiomatic way to read a CSV file with Akka Streams is to use the Alpakka CSV connector. The following example reads a CSV file, converts it to a map of column names (assumed to be the first line in the file) and ByteString values, transforms the ByteString values to String values, and prints each line:

FileIO.fromPath(Paths.get("a.csv"))
  .via(CsvParsing.lineScanner())
  .via(CsvToMap.toMap())
  .map(_.mapValues(_.utf8String))
  .runForeach(println)

Upvotes: 16

fcat
fcat

Reputation: 1251

Actually, akka-streams provides a function to directly read from a file.

FileIO.fromPath(Paths.get("a.csv"))
      .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
      .runForeach(println)

Here, runForeach method is to print the lines. If you have a proper Sink to process these lines, use it instead of this function. For example, if you want to split the lines by ' and print the total number of words in it:

val sink: Sink[String] = Sink.foreach(x => println(x.split(",").size))

FileIO.fromPath(Paths.get("a.csv"))
      .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
      .to(sink)
      .run()

Upvotes: 23

expert
expert

Reputation: 30085

Yeah, it's ok because these are different Sources. But if you don't like scala.io.Source you can read file yourself (which sometimes we have to do e.g. source csv file is zipped) and then parse it using given InputStream like this

StreamConverters.fromInputStream(() => input)
  .via(Framing.delimiter(ByteString("\n"), 4096))
  .map(_.utf8String)
  .collect { line =>
    line
  }

Having said that consider using Apache Commons CSV with akka-stream. You may end up writing less code :)

Upvotes: 2

Related Questions