SanjanaSanju
SanjanaSanju

Reputation: 297

Unable to insert the parsed xml data into delta tables in spark with a changing input schema

I am trying to insert data from a dataframe into a delta table. Initially, I am parsing an xml file based on a target schema and saving the result into a dataframe. Below is the code used for parsing.

def parseAsset (nodeSeqXml: scala.xml.NodeSeq) : Seq[String] = {
  //convert nodeseq to xml
  
 
  
  Seq(  (nodeSeqXml \ "AMS").\@("Pro"),
        (nodeSeqXml \ "AMS").\@("Prod"),
        (nodeSeqXml \ "AMS").\@("Asset"),
        (nodeSeqXml \ "AMS").\@("Descrn"),
        (nodeSeqXml \ "AMS").\@("Creation_Dt"),
        (nodeSeqXml \ "AMS").\@("Provider"),
        (nodeSeqXml \ "AMS").\@("AssetD"),
        (nodeSeqXml \ "AMS").\@("lass"),
        (nodeSeqXml \ "AMS").\@("hyu"),
  ((nodeSeqXml \ "App_Data" ).map(d => ((d \\ "@Name").text + "@-" + (d \\ "@Value").text))).mkString("!-"))
}


val AssetXml = XML.loadFile("filepath/filename")
 
val metadataNodeSeqLst = (AssetXml \\ "Metadata")
var records: Seq[String] = Seq()
 //for each of Metadata tag
metadataNodeSeqLst.foreach(nodeSeqXml => {
  records = records :+ parseAsset(nodeSeqXml).mkString("%-")
})


val AssetDF = records.toDF("ETY_Asset")

After this step I am splitting the columns and exploding the array columns and finally saving the data into dataframe, After that I am inserting this data into a delta table by using below.

outputparse.write.format("delta").mode("append").option("mergeSchema", "true").insertInto("targettable")

This works fine , if the source file is having same number of columns as target. But in this case, there will be different files with different schema which will be passed as input to the parsing code. For instance,target schema is having 77 columns and if the incoming file has 65 columns and while inserting data into the delta tables I get the below error.

org.apache.spark.sql.AnalysisException: Cannot write to 'target', not enough data columns; target table has 74 column(s) but the inserted data has 65 column(s);

Like this I get files with different input schemas, but my target schema is constant. So, basically I need to pass Null to the missing fields. I understand that I need to do a schema comparison in my parsing code before writing the data to the dataframe.Could you please let me know how to achieve this and where to incorporate this logic in my parsing code.

Upvotes: 0

Views: 599

Answers (0)

Related Questions