Bala
Bala

Reputation: 11264

How do I process XML file in spark?

Learning spark and scala. I have a snippet that process xml literal. But when I try to load the xml from file, I couldn't make it work. Probably I am missing a key understanding. Would appreciate some help. I am using cloudera VM and it has spark 1.6 & scala 2.10.5.

Scenario: Read xml, extract id, name and display as id@name.

scala> import scala.xml._
scala> val strxml = <employees>
     | <employee><id>1</id><name>chris</name></employee>
     | <employee><id>2</id><name>adam</name></employee>
     | <employee><id>3</id><name>karl</name></employee>
     | </employees>
strxml: scala.xml.Elem = 
<employees>
<employee><id>1</id><name>chris</name></employee>
<employee><id>2</id><name>adam</name></employee>
<employee><id>3</id><name>karl</name></employee>
</employees>

scala> val t = strxml.flatMap(line => line \\ "employee")
t: scala.xml.NodeSeq = NodeSeq(<employee><id>1</id><name>chris</name></employee>, <employee><id>2</id><name>adam</name></employee>, <employee><id>3</id><name>karl</name></employee>)

scala> t.map(l => (l \\ "id").text + "@" + (l \\ "name").text).foreach(println)
1@chris
2@adam
3@karl

Loading it from a file (exception thrown; What am I doing wrong here?)

scala> val filexml = sc.wholeTextFiles("file:///home/cloudera/test*")
filexml: org.apache.spark.rdd.RDD[(String, String)] = file:///home/cloudera/test* MapPartitionsRDD[66] at wholeTextFiles at <console>:30

scala> val lines = filexml.map(line => XML.loadString(line._2))
lines: org.apache.spark.rdd.RDD[scala.xml.Elem] = MapPartitionsRDD[89] at map at <console>:32

scala> val ft = lines.map(l => l \\ "employee")
ft: org.apache.spark.rdd.RDD[scala.xml.NodeSeq] = MapPartitionsRDD[99] at map at <console>:34

scala> ft.map(l => (l \\ "id").text + "@" + (l \\ "name").text).foreach(println)

Exception in task 0.0 in stage 63.0 (TID 63)
org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 1; Content is not allowed in prolog

Contents of files

test.xml

<employees>
<employee><id>1</id><name>chris</name></employee>
<employee><id>2</id><name>adam</name></employee>
<employee><id>3</id><name>karl</name></employee>
</employees>

test2.xml

<employees>
<employee><id>4</id><name>hive</name></employee>
<employee><id>5</id><name>elixir</name></employee>
<employee><id>6</id><name>spark</name></employee>
</employees>

Upvotes: 2

Views: 4553

Answers (2)

Bala
Bala

Reputation: 11264

Answering my own question.

scala> val filexml = sc.wholeTextFiles("file:///Volumes/BigData/sample_data/test*.xml")
filexml: org.apache.spark.rdd.RDD[(String, String)] = file:///Volumes/BigData/sample_data/test*.xml MapPartitionsRDD[1] at wholeTextFiles at <console>:24

scala> val lines = filexml.flatMap(line => XML.loadString(line._2) \\ "employee")
lines: org.apache.spark.rdd.RDD[scala.xml.Node] = MapPartitionsRDD[3] at flatMap at <console>:29

scala> lines.map(line => (line \\ "id").text + "@" + (line \\ "name").text).foreach(println)
1@chris
2@adam
3@karl
4@hive
5@elixir
6@spark

Upvotes: 2

ROOT
ROOT

Reputation: 1775

This is the java code for processing XML data in Spark, convert this to as per your requirement.

package packagename;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;

import com.databricks.spark.xml.XmlReader;

public class XmlreaderSpark {
    public static void main(String arr[]){
    String localxml="file path";
    String booksFileTag = "user";

    String warehouseLocation = "file:" + System.getProperty("user.dir") + "spark-warehouse";
   System.out.println("warehouseLocation" + warehouseLocation);
    SparkSession spark = SparkSession
              .builder()
              .master("local")
              .appName("Java Spark SQL Example")
              .config("spark.some.config.option", "some-value").config("spark.sql.warehouse.dir", warehouseLocation)
              .enableHiveSupport().config("set spark.sql.crossJoin.enabled", "true")
              .getOrCreate();
    SQLContext sqlContext = new SQLContext(spark);

    Dataset<Row> df = (new XmlReader()).withRowTag(booksFileTag).xmlFile(sqlContext, localxml);
    df.show();

    }
}

You need to add following dependency.

<dependency>
   <groupId>com.databricks</groupId>
   <artifactId>spark-xml_2.10</artifactId>
   <version>0.4.0</version>
</dependency>

Upvotes: 0

Related Questions