Reputation: 1217
I`m using Alpakka for parsing csv files. version "com.lightbend.akka" %% "akka-stream-alpakka-csv" % 0.20 I have csv file with unclosed quote.
email
[email protected]
"[email protected]
[email protected]
[email protected]
I want to skip bad rows and go next, but my stream is falling.
I`m using supervisionStrategy Supervision.Resume, but it is not working.
The stream fail when find unclosed quote.
Is there any way to fix that?
my code:
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
def hdfsSource(csv: String): Source[ByteString, Future[IOResult]] =
Source
.single(csv)
.map(ByteString.apply)
.mapMaterializedValue(_ => Future.successful(IOResult(1, Success(Done))))
val csv = """email,country,name
|"test,test,test
|test,test,test
|test,test,test
|""".stripMargin
val source = hdfsSource(csv)
val decider: Supervision.Decider = {
case _ ⇒ Supervision.Resume
}
val result = source
.via(CsvParsing.lineScanner())
.via(CsvToMap.toMapAsStrings())
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runForeach(println)
Upvotes: 0
Views: 541
Reputation: 2253
Currently CsvParsing.lineScanner()
does not support supervision strategies. You can choose another symbol as a quote character for the line scanner CsvParsing.lineScanner(quoteChar = '\'')
. Then you will get the unclosed double quote as part of parsed results:
Map(email -> "test, country -> test, name -> test)
Map(email -> test, country -> test, name -> test)
Map(email -> test, country -> test, name -> test)
Upvotes: 0