CPS
CPS

Reputation: 537

Can alpakka-xml process multiple xml files?

I'm having trouble using alpakka's XmlParsing Flow

val files: List[String] = ... // file paths locally on disk

// simple source emitting the contents of 2 XML files
val documentSource = FileIO.fromPath(Paths.get(files.head))
  .concat(FileIO.fromPath(Paths.get(files(1))))

val contentFlow: Flow[ParseEvent, CustomContent, Notused] =
  Flow.fromGraph(new ContentProcessorFlow)

documentSource
  .via(XmlParsing.parser)
  .via(contentFlow)
  .to(Sink.foreach(println))
  .run

When this is run, the graph prints the elements emitted by contentFlow, which are correct and match the expected values for the first file. After those, this exception is thrown:

[ERROR] [12/20/2018 16:32:23.648] [Sync-akka.actor.default-dispatcher-2] [akka://Sync/system/StreamSupervisor-0/flow-0-0-ignoreSink] Error in stage [akka.stream.alpakka.xml.impl.StreamingXmlParser@36b80955]: Illegal processing instruction target: 'xml' (case insensitive) is reserved by the xml specification
 at [row,col {unknown-source}]: [44,17]
com.fasterxml.aalto.WFCException: Illegal processing instruction target: 'xml' (case insensitive) is reserved by the xml specification
 at [row,col {unknown-source}]: [44,17]
at com.fasterxml.aalto.in.XmlScanner.reportInputProblem(XmlScanner.java:1333)
at com.fasterxml.aalto.async.AsyncByteScanner.checkPITargetName(AsyncByteScanner.java:665)
at com.fasterxml.aalto.async.AsyncByteArrayScanner.handlePI(AsyncByteArrayScanner.java:2091)
at com.fasterxml.aalto.async.AsyncByteArrayScanner.nextFromProlog(AsyncByteArrayScanner.java:1064)
at com.fasterxml.aalto.stax.StreamReaderImpl.next(StreamReaderImpl.java:802)
at akka.stream.alpakka.xml.impl.StreamingXmlParser$$anon$1.advanceParser(StreamingXmlParser.scala:55)

I understand the basics of what's happening here - the parser is complaining about the ByteString of the <?xml version="1.0" encoding="UTF-8"?> directive at the top of the second file, but I'm not familiar enough with streams to know what to do about it. If I remove the directive I get a slightly different exception about having 2 root elements.

My goal is to build a graph that reads files from a location and emits CustomContent for further processing. How can I rework this to treat each file as a distinct unit of input?

Upvotes: 0

Views: 162

Answers (1)

Jeffrey Chung
Jeffrey Chung

Reputation: 19517

Treat the files as distinct Sources, then merge them into one Source:

val files: List[String] = ???

val sources: List[Source[CustomContent, Future[IOResult]]] =
  files
    .map { f =>
      FileIO.fromPath(Paths.get(f))
        .via(XMLParsing.parser)
        .via(contentFlow)
    }

val mergedSource: Source[CustomContent, NotUsed] =
  Source(sources).flatMapConcat(identity)

mergedSource.runForeach(println)

Upvotes: 1

Related Questions