Reputation: 1699
I need to process XML documents that consist of a very large number of independent records, e.g.
<employees>
<employee>
<firstName>Kermit</firstName>
<lastName>Frog</lastName>
<role>Singer</role>
</employee>
<employee>
<firstName>Oscar</firstName>
<lastName>Grouch</lastName>
<role>Garbageman</role>
</employee>
...
</employees>
In some cases these are just big files, but in others they may come from a streaming source.
I can't just scala.xml.XmlLoader.load() it because I don't want to hold the whole document in memory (or wait for the input stream to close), when I only need to work with one record at a time. I know I can use XmlEventReader to stream the input as a sequence of XmlEvents. These are however much less convenient to work with than scala.xml.Node.
So I'd like to get a lazy Iterator[Node] out of this somehow, in order to operate on each individual record using the convenient Scala syntax, while keeping memory usage under control.
To do this myself, I could start with an XmlEventReader, build up a buffer of events between each matching start and end tag, and then construct a Node tree from that. But, is there an easier way that I've overlooked? Thanks for any insights!
Upvotes: 9
Views: 1767
Reputation: 1699
To get from huynhjl's generator solution to a TraversableOnce[Node]
, use this trick:
def generatorToTraversable[T](func: (T => Unit) => Unit) =
new Traversable[T] {
def foreach[X](f: T => X) {
func(f(_))
}
}
def firstLevelNodes(input: Source): TraversableOnce[Node] =
generatorToTraversable(processSource(input))
The result of generatorToTraversable is not traversable more than once (even though a new ConstructingParser is instantiated on each foreach call) because the input stream is a Source, which is an Iterator. We can't override Traversable.isTraversableAgain, though, because it's final.
Really we'd like to enforce this by just returning an Iterator. However, both Traversable.toIterator and Traversable.view.toIterator make an intermediate Stream, which will cache all the entries (defeating the whole purpose of this exercise). Oh well; I'll just let the stream throw an exception if it's accessed twice.
Also note the whole thing isn't thread safe.
This code runs great, and I believe the overall solution to be both lazy and not caching (hence constant memory), though I haven't tried it on a large input yet.
Upvotes: 5
Reputation: 41646
You can use the underlying parser used by XMLEventReader
through ConstructingParser
and process your employee nodes below the top level with a callback. You just have to be careful discarding the data as soon as processed:
import scala.xml._
def processSource[T](input: Source)(f: NodeSeq => T) {
new scala.xml.parsing.ConstructingParser(input, false) {
nextch // initialize per documentation
document // trigger parsing by requesting document
var depth = 0 // track depth
override def elemStart(pos: Int, pre: String, label: String,
attrs: MetaData, scope: NamespaceBinding) {
super.elemStart(pos, pre, label, attrs, scope)
depth += 1
}
override def elemEnd(pos: Int, pre: String, label: String) {
depth -= 1
super.elemEnd(pos, pre, label)
}
override def elem(pos: Int, pre: String, label: String, attrs: MetaData,
pscope: NamespaceBinding, nodes: NodeSeq): NodeSeq = {
val node = super.elem(pos, pre, label, attrs, pscope, nodes)
depth match {
case 1 => <dummy/> // dummy final roll up
case 2 => f(node); NodeSeq.Empty // process and discard employee nodes
case _ => node // roll up other nodes
}
}
}
}
Then you use like this to process each node at the second level in constant memory (assuming the nodes at the second level aren't getting an arbitrary number of children):
processSource(src){ node =>
// process here
println(node)
}
The benefit compared to XMLEventReader
is that you don't use two threads. Also you don't have to parse the node twice compared to your proposed solution. The drawback is that this relies on the inner workings of ConstructingParser
.
Upvotes: 8