David Soergel
David Soergel

Reputation: 1699

How to get a streaming Iterator[Node] from a large XML document?

I need to process XML documents that consist of a very large number of independent records, e.g.

<employees>
    <employee>
         <firstName>Kermit</firstName>
         <lastName>Frog</lastName>
         <role>Singer</role>
    </employee>
    <employee>
         <firstName>Oscar</firstName>
         <lastName>Grouch</lastName>
         <role>Garbageman</role>
    </employee>
    ...
</employees>

In some cases these are just big files, but in others they may come from a streaming source.

I can't just scala.xml.XmlLoader.load() it because I don't want to hold the whole document in memory (or wait for the input stream to close), when I only need to work with one record at a time. I know I can use XmlEventReader to stream the input as a sequence of XmlEvents. These are however much less convenient to work with than scala.xml.Node.

So I'd like to get a lazy Iterator[Node] out of this somehow, in order to operate on each individual record using the convenient Scala syntax, while keeping memory usage under control.

To do this myself, I could start with an XmlEventReader, build up a buffer of events between each matching start and end tag, and then construct a Node tree from that. But, is there an easier way that I've overlooked? Thanks for any insights!

Upvotes: 9

Views: 1767

Answers (2)

David Soergel
David Soergel

Reputation: 1699

To get from huynhjl's generator solution to a TraversableOnce[Node], use this trick:

def generatorToTraversable[T](func: (T => Unit) => Unit) = 
  new Traversable[T] {
    def foreach[X](f: T => X) {
      func(f(_))
    }
  }

def firstLevelNodes(input: Source): TraversableOnce[Node] =
  generatorToTraversable(processSource(input))

The result of generatorToTraversable is not traversable more than once (even though a new ConstructingParser is instantiated on each foreach call) because the input stream is a Source, which is an Iterator. We can't override Traversable.isTraversableAgain, though, because it's final.

Really we'd like to enforce this by just returning an Iterator. However, both Traversable.toIterator and Traversable.view.toIterator make an intermediate Stream, which will cache all the entries (defeating the whole purpose of this exercise). Oh well; I'll just let the stream throw an exception if it's accessed twice.

Also note the whole thing isn't thread safe.

This code runs great, and I believe the overall solution to be both lazy and not caching (hence constant memory), though I haven't tried it on a large input yet.

Upvotes: 5

huynhjl
huynhjl

Reputation: 41646

You can use the underlying parser used by XMLEventReader through ConstructingParser and process your employee nodes below the top level with a callback. You just have to be careful discarding the data as soon as processed:

import scala.xml._

def processSource[T](input: Source)(f: NodeSeq => T) {
  new scala.xml.parsing.ConstructingParser(input, false) {
    nextch // initialize per documentation
    document // trigger parsing by requesting document

    var depth = 0 // track depth

    override def elemStart(pos: Int, pre: String, label: String,
        attrs: MetaData, scope: NamespaceBinding) {
      super.elemStart(pos, pre, label, attrs, scope)
      depth += 1
    }
    override def elemEnd(pos: Int, pre: String, label: String) {
      depth -= 1
      super.elemEnd(pos, pre, label)
    }
    override def elem(pos: Int, pre: String, label: String, attrs: MetaData,
        pscope: NamespaceBinding, nodes: NodeSeq): NodeSeq = {
      val node = super.elem(pos, pre, label, attrs, pscope, nodes)
      depth match {
        case 1 => <dummy/> // dummy final roll up
        case 2 => f(node); NodeSeq.Empty // process and discard employee nodes
        case _ => node // roll up other nodes
      }
    }
  }
}

Then you use like this to process each node at the second level in constant memory (assuming the nodes at the second level aren't getting an arbitrary number of children):

processSource(src){ node =>
  // process here
  println(node)
}

The benefit compared to XMLEventReader is that you don't use two threads. Also you don't have to parse the node twice compared to your proposed solution. The drawback is that this relies on the inner workings of ConstructingParser.

Upvotes: 8

Related Questions