Slavik  Muz
Slavik Muz

Reputation: 1217

akka stream alpakka csv: skip exception and parse next rows

I`m using Alpakka for parsing csv files. version "com.lightbend.akka" %% "akka-stream-alpakka-csv" % 0.20 I have csv file with unclosed quote.

email
[email protected]
"[email protected]
[email protected]
[email protected]

I want to skip bad rows and go next, but my stream is falling.

I`m using supervisionStrategy Supervision.Resume, but it is not working.

The stream fail when find unclosed quote.

Is there any way to fix that?

my code:

implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()

def hdfsSource(csv: String): Source[ByteString, Future[IOResult]] =
  Source
    .single(csv)
    .map(ByteString.apply)
    .mapMaterializedValue(_ => Future.successful(IOResult(1, Success(Done))))

val csv = """email,country,name
            |"test,test,test
            |test,test,test
            |test,test,test
            |""".stripMargin

val source = hdfsSource(csv)

val decider: Supervision.Decider = {
  case _ ⇒ Supervision.Resume
}

val result = source
  .via(CsvParsing.lineScanner())
  .via(CsvToMap.toMapAsStrings())
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
  .runForeach(println)

Upvotes: 0

Views: 541

Answers (1)

dvim
dvim

Reputation: 2253

Currently CsvParsing.lineScanner() does not support supervision strategies. You can choose another symbol as a quote character for the line scanner CsvParsing.lineScanner(quoteChar = '\''). Then you will get the unclosed double quote as part of parsed results:

Map(email -> "test, country -> test, name -> test) Map(email -> test, country -> test, name -> test) Map(email -> test, country -> test, name -> test)

Upvotes: 0

Related Questions