Reputation: 1585
I want to create a data frame which has schema of another data frame .
This is my code where I want to create mirror schema .
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
val getFFActionParent = udf { (FFAction: String) =>
if (FFAction=="Insert") "I|!|"
else if (FFAction=="Overwrite") "I|!|"
else "D|!|"
}
val getFFActionChild = udf { (FFAction: String) =>
if (FFAction=="Insert") "I|!|"
else if (FFAction=="Overwrite") "O|!|"
else "D|!|"
}
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val FinancialSourceDF=dfContentItem.select($"env:Data.sr:Source.*",getFFActionParent($"_action").as("FFAction")).filter($"env:Data.sr:Source._organizationId".isNotNull).drop("sr:Auditors")
val FinancialSourceDFFinal=dfContentItem.select($"env:Data.sr:Source._organizationId".as("organizationId"), $"env:Data.sr:Source._sourceId".as("sourceId"), $"env:Data.sr:Source.*",getFFActionParent($"_action").as("FFAction")).filter($"env:Data.sr:Source._organizationId".isNotNull).drop("sr:Auditors")
The schema will be mirrored from below data frame
val rdd1 = sc.textFile("s3://trfsmallfffile/FinancialSource/INCR")
val header1 = rdd1.filter(_.contains("Source.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val MirrorSchemaschema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
Here I am trying to create mirror schema but not able to create it ..
val FinancialSourceSaveDF=FinancialSourceDF.select(FinancialSourceDF.columns.filter(x => !x.equals("sr:Auditors")).map(x => col(x).as(x.replace("_", "Source.").replace("sr:", ""))): _*)
FinancialSourceSaveDF.select(MirrorSchemaschema1.fieldNames.map(col): _*).show(false)
NOTE:The columns in FinancialSourceSaveDF
data frame may change i.e sometime all columns will be present sometime some of them will be present .
null value will be filled for the non present columns .
Upvotes: 0
Views: 287
Reputation: 41957
After you get dfContentItem
, you can simply select
the elements you require from Source
struct and rename columns using alias
as below.
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val dfType=dfContentItem.select($"env:Data.sr:Source.*", $"_action".as("FFAction|!|"))
val temp = dfType.select(dfType.columns.filter(x => !x.equals("sr:Auditors")).map(x => col(x).as(x.replace("_", "Source_").replace("sr:", ""))): _*)
you can used the udf function you've mentioned as you prefer
Once you know you have all the columns required for schema
columns read from
Source.organizationId|^|Source.sourceId|^|FilingDateTime|^|SourceTypeCode|^|DocumentId|^|Dcn|^|DocFormat|^|StatementDate|^|IsFilingDateTimeEstimated|^|ContainsPreliminaryData|^|CapitalChangeAdjustmentDate|^|CumulativeAdjustmentFactor|^|ContainsRestatement|^|FilingDateTimeUTCOffset|^|ThirdPartySourceCode|^|ThirdPartySourcePriority|^|SourceTypeId|^|ThirdPartySourceCodeId|^|FFAction|!|
Now Your Main problem is that you don't know how many column names might be missing in the new dataframe from the referenced dataframe schema. For that you can find the difference between the column names and use foldLeft
function to populate the missing columns with null
values as
val diff = schema.fieldNames.diff(temp.schema.fieldNames)
val finaldf = diff.foldLeft(temp){(temp2df, colName) => temp2df.withColumn(colName, lit(null))}
You can just select
to get the mirror schema as below
finaldf.select(schema.fieldNames.map(col): _*).show(false)
You should have output as
+---------------------+---------------+-----------------------+--------------+----------+----+---------+-----------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+
|Source_organizationId|Source_sourceId|FilingDateTime |SourceTypeCode|DocumentId|Dcn |DocFormat|StatementDate |IsFilingDateTimeEstimated|ContainsPreliminaryData|CapitalChangeAdjustmentDate|CumulativeAdjustmentFactor|ContainsRestatement|FilingDateTimeUTCOffset|ThirdPartySourceCode|ThirdPartySourcePriority|SourceTypeId|ThirdPartySourceCodeId|FFAction|!||
+---------------------+---------------+-----------------------+--------------+----------+----+---------+-----------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+
|4295906830 |344 |20171111T17:00:00+00:00|10K |null |null|null |20171030T00:00:00+00:00|false |false |20171030T00:00:00+00:00 |1.0 |false |300 |SS |1 |3011835 |1000716240 |Overwrite |
+---------------------+---------------+-----------------------+--------------+----------+----+---------+-----------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+
I hope the answer is helpful
Upvotes: 2