Reputation: 11264
Learning spark and scala. I have a snippet that process xml literal. But when I try to load the xml from file, I couldn't make it work. Probably I am missing a key understanding. Would appreciate some help. I am using cloudera VM and it has spark 1.6 & scala 2.10.5.
Scenario: Read xml, extract id, name and display as id@name.
scala> import scala.xml._
scala> val strxml = <employees>
| <employee><id>1</id><name>chris</name></employee>
| <employee><id>2</id><name>adam</name></employee>
| <employee><id>3</id><name>karl</name></employee>
| </employees>
strxml: scala.xml.Elem =
<employees>
<employee><id>1</id><name>chris</name></employee>
<employee><id>2</id><name>adam</name></employee>
<employee><id>3</id><name>karl</name></employee>
</employees>
scala> val t = strxml.flatMap(line => line \\ "employee")
t: scala.xml.NodeSeq = NodeSeq(<employee><id>1</id><name>chris</name></employee>, <employee><id>2</id><name>adam</name></employee>, <employee><id>3</id><name>karl</name></employee>)
scala> t.map(l => (l \\ "id").text + "@" + (l \\ "name").text).foreach(println)
1@chris
2@adam
3@karl
Loading it from a file (exception thrown; What am I doing wrong here?)
scala> val filexml = sc.wholeTextFiles("file:///home/cloudera/test*")
filexml: org.apache.spark.rdd.RDD[(String, String)] = file:///home/cloudera/test* MapPartitionsRDD[66] at wholeTextFiles at <console>:30
scala> val lines = filexml.map(line => XML.loadString(line._2))
lines: org.apache.spark.rdd.RDD[scala.xml.Elem] = MapPartitionsRDD[89] at map at <console>:32
scala> val ft = lines.map(l => l \\ "employee")
ft: org.apache.spark.rdd.RDD[scala.xml.NodeSeq] = MapPartitionsRDD[99] at map at <console>:34
scala> ft.map(l => (l \\ "id").text + "@" + (l \\ "name").text).foreach(println)
Exception in task 0.0 in stage 63.0 (TID 63)
org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 1; Content is not allowed in prolog
Contents of files
test.xml
<employees>
<employee><id>1</id><name>chris</name></employee>
<employee><id>2</id><name>adam</name></employee>
<employee><id>3</id><name>karl</name></employee>
</employees>
test2.xml
<employees>
<employee><id>4</id><name>hive</name></employee>
<employee><id>5</id><name>elixir</name></employee>
<employee><id>6</id><name>spark</name></employee>
</employees>
Upvotes: 2
Views: 4553
Reputation: 11264
Answering my own question.
scala> val filexml = sc.wholeTextFiles("file:///Volumes/BigData/sample_data/test*.xml")
filexml: org.apache.spark.rdd.RDD[(String, String)] = file:///Volumes/BigData/sample_data/test*.xml MapPartitionsRDD[1] at wholeTextFiles at <console>:24
scala> val lines = filexml.flatMap(line => XML.loadString(line._2) \\ "employee")
lines: org.apache.spark.rdd.RDD[scala.xml.Node] = MapPartitionsRDD[3] at flatMap at <console>:29
scala> lines.map(line => (line \\ "id").text + "@" + (line \\ "name").text).foreach(println)
1@chris
2@adam
3@karl
4@hive
5@elixir
6@spark
Upvotes: 2
Reputation: 1775
This is the java code for processing XML data in Spark, convert this to as per your requirement.
package packagename;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import com.databricks.spark.xml.XmlReader;
public class XmlreaderSpark {
public static void main(String arr[]){
String localxml="file path";
String booksFileTag = "user";
String warehouseLocation = "file:" + System.getProperty("user.dir") + "spark-warehouse";
System.out.println("warehouseLocation" + warehouseLocation);
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("Java Spark SQL Example")
.config("spark.some.config.option", "some-value").config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport().config("set spark.sql.crossJoin.enabled", "true")
.getOrCreate();
SQLContext sqlContext = new SQLContext(spark);
Dataset<Row> df = (new XmlReader()).withRowTag(booksFileTag).xmlFile(sqlContext, localxml);
df.show();
}
}
You need to add following dependency.
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-xml_2.10</artifactId>
<version>0.4.0</version>
</dependency>
Upvotes: 0