Sudarshan kumar
Sudarshan kumar

Reputation: 1585

How to mirror schema in Spark data frame

I want to create a data frame which has schema of another data frame .

This is my code where I want to create mirror schema .

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf


val getFFActionParent =  udf { (FFAction: String) => 
    if (FFAction=="Insert") "I|!|"
    else if (FFAction=="Overwrite") "I|!|"
    else "D|!|" 
}

val getFFActionChild =  udf { (FFAction: String) => 
    if (FFAction=="Insert") "I|!|"
    else if (FFAction=="Overwrite") "O|!|"
    else "D|!|" 
}

val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")


val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val FinancialSourceDF=dfContentItem.select($"env:Data.sr:Source.*",getFFActionParent($"_action").as("FFAction")).filter($"env:Data.sr:Source._organizationId".isNotNull).drop("sr:Auditors")
val FinancialSourceDFFinal=dfContentItem.select($"env:Data.sr:Source._organizationId".as("organizationId"), $"env:Data.sr:Source._sourceId".as("sourceId"), $"env:Data.sr:Source.*",getFFActionParent($"_action").as("FFAction")).filter($"env:Data.sr:Source._organizationId".isNotNull).drop("sr:Auditors")

The schema will be mirrored from below data frame

val rdd1 = sc.textFile("s3://trfsmallfffile/FinancialSource/INCR")
val header1 = rdd1.filter(_.contains("Source.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val MirrorSchemaschema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)

Here I am trying to create mirror schema but not able to create it ..

val FinancialSourceSaveDF=FinancialSourceDF.select(FinancialSourceDF.columns.filter(x => !x.equals("sr:Auditors")).map(x => col(x).as(x.replace("_", "Source.").replace("sr:", ""))): _*)

FinancialSourceSaveDF.select(MirrorSchemaschema1.fieldNames.map(col): _*).show(false)

NOTE:The columns in FinancialSourceSaveDF data frame may change i.e sometime all columns will be present sometime some of them will be present . null value will be filled for the non present columns .

Upvotes: 0

Views: 287

Answers (1)

Ramesh Maharjan
Ramesh Maharjan

Reputation: 41957

After you get dfContentItem, you can simply select the elements you require from Source struct and rename columns using alias as below.

val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val dfType=dfContentItem.select($"env:Data.sr:Source.*", $"_action".as("FFAction|!|"))
val temp = dfType.select(dfType.columns.filter(x => !x.equals("sr:Auditors")).map(x => col(x).as(x.replace("_", "Source_").replace("sr:", ""))): _*)

you can used the udf function you've mentioned as you prefer

Once you know you have all the columns required for schema columns read from

Source.organizationId|^|Source.sourceId|^|FilingDateTime|^|SourceTypeCode|^|DocumentId|^|Dcn|^|DocFormat|^|StatementDate|^|IsFilingDateTimeEstimated|^|ContainsPreliminaryData|^|CapitalChangeAdjustmentDate|^|CumulativeAdjustmentFactor|^|ContainsRestatement|^|FilingDateTimeUTCOffset|^|ThirdPartySourceCode|^|ThirdPartySourcePriority|^|SourceTypeId|^|ThirdPartySourceCodeId|^|FFAction|!|

Now Your Main problem is that you don't know how many column names might be missing in the new dataframe from the referenced dataframe schema. For that you can find the difference between the column names and use foldLeft function to populate the missing columns with null values as

val diff = schema.fieldNames.diff(temp.schema.fieldNames)
val finaldf = diff.foldLeft(temp){(temp2df, colName) => temp2df.withColumn(colName, lit(null))}

You can just select to get the mirror schema as below

finaldf.select(schema.fieldNames.map(col): _*).show(false)

You should have output as

+---------------------+---------------+-----------------------+--------------+----------+----+---------+-----------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+
|Source_organizationId|Source_sourceId|FilingDateTime         |SourceTypeCode|DocumentId|Dcn |DocFormat|StatementDate          |IsFilingDateTimeEstimated|ContainsPreliminaryData|CapitalChangeAdjustmentDate|CumulativeAdjustmentFactor|ContainsRestatement|FilingDateTimeUTCOffset|ThirdPartySourceCode|ThirdPartySourcePriority|SourceTypeId|ThirdPartySourceCodeId|FFAction|!||
+---------------------+---------------+-----------------------+--------------+----------+----+---------+-----------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+
|4295906830           |344            |20171111T17:00:00+00:00|10K           |null      |null|null     |20171030T00:00:00+00:00|false                    |false                  |20171030T00:00:00+00:00    |1.0                       |false              |300                    |SS                  |1                       |3011835     |1000716240            |Overwrite  |
+---------------------+---------------+-----------------------+--------------+----------+----+---------+-----------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+

I hope the answer is helpful

Upvotes: 2

Related Questions