Reputation: 1585
I have two data frames on which i perform outer join . Data frame 1 data set is like this
Source.organizationId|^|Source.sourceId|^|FilingDateTime|^|SourceTypeCode|^|DocumentId|^|Dcn|^|DocFormat|^|StatementDate|^|IsFilingDateTimeEstimated|^|ContainsPreliminaryData|^|CapitalChangeAdjustmentDate|^|CumulativeAdjustmentFactor|^|ContainsRestatement|^|FilingDateTimeUTCOffset|^|ThirdPartySourceCode|^|ThirdPartySourcePriority|^|SourceTypeId|^|ThirdPartySourceCodeId|^|FFAction|!|
4295876791|^|162|^|2017-08-10T06:01:00Z|^|YUH|^|44604379|^|yo00196838|^|PDFNTV|^|2017-06-30T00:00:00Z|^|False|^|False|^|2017-06-30T00:00:00Z|^|1.00000|^|False|^|540|^|SS |^|1|^|3013057|^|1000716240|^|I|!|
4295877415|^|167|^|2005-03-31T00:00:00Z|^|ESGWEB|^||^||^||^|2005-03-31T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|168|^|2010-03-31T00:00:00Z|^|ESGWEB|^||^||^||^|2010-03-31T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|169|^|2007-03-31T00:00:00Z|^|ESGWEB|^||^||^||^|2007-03-31T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|170|^|2014-12-31T00:00:00Z|^|ESGWEB|^||^||^||^|2014-12-31T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|171|^|2012-12-31T00:00:00Z|^|ESGWEB|^||^||^||^|2012-12-31T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|172|^|2009-03-31T00:00:00Z|^|ESGWEB|^||^||^||^|2009-03-31T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|194|^|2015-03-30T00:00:00Z|^|ESGWEB|^||^||^||^|2013-01-31T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|195|^|2008-05-06T00:00:00Z|^|ESGWEB|^||^||^||^|2008-01-31T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|214|^|2012-03-08T00:00:00Z|^|ESGWEB|^||^||^||^|2011-01-31T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|215|^|2004-06-30T00:00:00Z|^|ESGWEB|^||^||^||^|2004-01-01T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|216|^|2012-06-25T00:00:00Z|^|ESGWEB|^||^||^||^|2011-01-31T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|217|^|2014-01-14T00:00:00Z|^|ESGWEB|^||^||^||^|2014-01-05T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|218|^|2008-05-09T00:00:00Z|^|ESGWEB|^||^||^||^|2007-01-31T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|219|^|2010-12-09T00:00:00Z|^|ESGWEB|^||^||^||^|2011-01-31T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|220|^|2011-06-29T00:00:00Z|^|ESGWEB|^||^||^||^|2011-01-31T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|221|^|2013-06-29T00:00:00Z|^|ESGWEB|^||^||^||^|2014-01-05T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|222|^|2015-02-23T00:00:00Z|^|ESGWEB|^||^||^||^|2014-01-05T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|223|^|2013-05-31T00:00:00Z|^|ESGWEB|^||^||^||^|2014-01-05T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|224|^|2012-03-20T00:00:00Z|^|ESGWEB|^||^||^||^|2011-01-31T00:00:00Z|^|True|^|False|^||^||^|False|^|0|^|ESG|^||^|1002198005|^||^|I|!|
4295877415|^|229|^|2015-12-31T00:00:00Z|^|ESGWEB|^||^||^||^|2015-12-31T00:00:00Z|^|True|^|False|^||^|1.00000|^|False|^|0|^|ATD|^||^|1002198005|^||^|I|!|
Data frame 2 is like below
DataPartition_1|^|TimeStamp|^|Source.organizationId|^|Source.sourceId|^|FilingDateTime_1|^|SourceTypeCode_1|^|DocumentId_1|^|Dcn_1|^|DocFormat_1|^|StatementDate_1|^|IsFilingDateTimeEstimated_1|^|ContainsPreliminaryData_1|^|CapitalChangeAdjustmentDate_1|^|CumulativeAdjustmentFactor_1|^|ContainsRestatement_1|^|FilingDateTimeUTCOffset_1|^|ThirdPartySourceCode_1|^|ThirdPartySourcePriority_1|^|SourceTypeId_1|^|ThirdPartySourceCodeId_1|^|FFAction_1
SelfSourcedPublic|^|1512723204932|^|4295859031|^|59|^|2017-04-04T18:00:00+00:00|^|10Q|^|null|^|null|^|null|^|2017-03-31T00:00:00+00:00|^|false|^|false|^|2017-03-31T00:00:00+00:00|^|1.00000|^|false|^|-360|^|SS|^|1|^|3011836|^|1000716240|^|I|!|
And here is the schema of both data frame
First schema root
|-- Source_organizationId: long (nullable = true)
|-- Source_sourceId: integer (nullable = true)
|-- FilingDateTime: string (nullable = true)
|-- SourceTypeCode: string (nullable = true)
|-- DocumentId: integer (nullable = true)
|-- Dcn: string (nullable = true)
|-- DocFormat: string (nullable = true)
|-- StatementDate: string (nullable = true)
|-- IsFilingDateTimeEstimated: boolean (nullable = true)
|-- ContainsPreliminaryData: boolean (nullable = true)
|-- CapitalChangeAdjustmentDate: string (nullable = true)
|-- CumulativeAdjustmentFactor: string (nullable = true)
|-- ContainsRestatement: boolean (nullable = true)
|-- FilingDateTimeUTCOffset: integer (nullable = true)
|-- ThirdPartySourceCode: string (nullable = true)
|-- ThirdPartySourcePriority: integer (nullable = true)
|-- SourceTypeId: integer (nullable = true)
|-- ThirdPartySourceCodeId: integer (nullable = true)
|-- FFAction: string (nullable = true)
|-- DataPartition: string (nullable = true)
Second schema root
|-- DataPartition_1: string (nullable = true)
|-- Source_organizationId: long (nullable = true)
|-- Source_sourceId: integer (nullable = true)
|-- FilingDateTime_1: string (nullable = true)
|-- SourceTypeCode_1: string (nullable = true)
|-- DocumentId_1: string (nullable = true)
|-- Dcn_1: string (nullable = true)
|-- DocFormat_1: string (nullable = true)
|-- StatementDate_1: string (nullable = true)
|-- IsFilingDateTimeEstimated_1: boolean (nullable = true)
|-- ContainsPreliminaryData_1: boolean (nullable = true)
|-- CapitalChangeAdjustmentDate_1: string (nullable = true)
|-- CumulativeAdjustmentFactor_1: string (nullable = true)
|-- ContainsRestatement_1: boolean (nullable = true)
|-- FilingDateTimeUTCOffset_1: integer (nullable = true)
|-- ThirdPartySourceCode_1: string (nullable = true)
|-- ThirdPartySourcePriority_1: integer (nullable = true)
|-- SourceTypeId_1: integer (nullable = true)
|-- ThirdPartySourceCodeId_1: integer (nullable = true)
|-- FFAction_1: string (nullable = true)
Now when i perform outer join then in the output field few columns are missing
Here is the sample output
Source.organizationId|^|Source.sourceId|^|FilingDateTime|^|SourceTypeCode|^|DocumentId|^|Dcn|^|DocFormat|^|StatementDate|^|IsFilingDateTimeEstimated|^|ContainsPreliminaryData|^|CapitalChangeAdjustmentDate|^|CumulativeAdjustmentFactor|^|ContainsRestatement|^|FilingDateTimeUTCOffset|^|ThirdPartySourceCode|^|ThirdPartySourcePriority|^|SourceTypeId|^|ThirdPartySourceCodeId|^|FFAction|!|
4295877415|^|217|^|2014-01-14T00:00:00Z|^|ESGWEB|^||^||^||^|2014-01-05T00:00:00Z|^|true|^|false|^||^||^|false|^|0|^|ESG|^|1002198005|^|I|!|
4295877415|^|171|^|2012-12-31T00:00:00Z|^|ESGWEB|^||^||^||^|2012-12-31T00:00:00Z|^|true|^|false|^||^||^|false|^|0|^|ESG|^|1002198005|^|I|!|
4295877415|^|167|^|2005-03-31T00:00:00Z|^|ESGWEB|^||^||^||^|2005-03-31T00:00:00Z|^|true|^|false|^||^||^|false|^|0|^|ESG|^|1002198005|^|I|!|
4295877415|^|219|^|2010-12-09T00:00:00Z|^|ESGWEB|^||^||^||^|2011-01-31T00:00:00Z|^|true|^|false|^||^||^|false|^|0|^|ESG|^|1002198005|^|I|!|
So here ThirdPartySourceCodeId
and ThirdPartySourcePriority
is missing where it is blank in the first data frame .For example in the second row of the first data.
There are 19 columns in the first data frame but in the output i get only 17 columns.
Here is the full code to generate output
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
import org.apache.spark.sql.functions._
val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsmallfffile/FinancialSource/MAIN")
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
val df1resultFinal=df1result.withColumn("DataPartition", get_cus_val(input_file_name))
val df1resultFinalwithTimestamp=df1resultFinal
.withColumn("FilingDateTime",date_format(col("FilingDateTime"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("StatementDate",date_format(col("StatementDate"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("CapitalChangeAdjustmentDate",date_format(col("CapitalChangeAdjustmentDate"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("CumulativeAdjustmentFactor", format_number(col("CumulativeAdjustmentFactor"), 5))
val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsmallfffile/FinancialSource/INCR")
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*)
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*)
val df2resultTimestamp=df2result
.withColumn("FilingDateTime_1",date_format(col("FilingDateTime_1"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("StatementDate_1",date_format(col("StatementDate_1"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("CapitalChangeAdjustmentDate_1",date_format(col("CapitalChangeAdjustmentDate_1"), "yyyy-MM-dd'T'HH:mm:ss'Z'"))
.withColumn("CumulativeAdjustmentFactor_1", format_number(col("CumulativeAdjustmentFactor_1"), 5))
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("Source_organizationId", "Source_sourceId").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = df2resultTimestamp.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
val dfMainOutput = df1resultFinalwithTimestamp.join(latestForEachKey, Seq("Source_organizationId", "Source_sourceId"), "outer")
.select($"Source_organizationId", $"Source_sourceId",
when($"FilingDateTime_1".isNotNull, $"FilingDateTime_1").otherwise($"FilingDateTime").as("FilingDateTime"),
when($"SourceTypeCode_1".isNotNull, $"SourceTypeCode_1").otherwise($"SourceTypeCode").as("SourceTypeCode"),
when($"DocumentId_1".isNotNull, $"DocumentId_1").otherwise($"DocumentId").as("DocumentId"),
when($"Dcn_1".isNotNull, $"Dcn_1").otherwise($"Dcn").as("Dcn"),
when($"DocFormat_1".isNotNull, $"DocFormat_1").otherwise($"DocFormat").as("DocFormat"),
when($"StatementDate_1".isNotNull, $"StatementDate_1").otherwise($"StatementDate").as("StatementDate"),
when($"IsFilingDateTimeEstimated_1".isNotNull, $"IsFilingDateTimeEstimated_1").otherwise($"IsFilingDateTimeEstimated").as("IsFilingDateTimeEstimated"),
when($"ContainsPreliminaryData_1".isNotNull, $"ContainsPreliminaryData_1").otherwise($"ContainsPreliminaryData").as("ContainsPreliminaryData"),
when($"CapitalChangeAdjustmentDate_1".isNotNull, $"CapitalChangeAdjustmentDate_1").otherwise($"CapitalChangeAdjustmentDate").as("CapitalChangeAdjustmentDate"),
when($"CumulativeAdjustmentFactor_1".isNotNull, $"CumulativeAdjustmentFactor_1").otherwise($"CumulativeAdjustmentFactor").as("CumulativeAdjustmentFactor"),
when($"ContainsRestatement_1".isNotNull, $"ContainsRestatement_1").otherwise($"ContainsRestatement").as("ContainsRestatement"),
when($"FilingDateTimeUTCOffset_1".isNotNull, $"FilingDateTimeUTCOffset_1").otherwise($"FilingDateTimeUTCOffset").as("FilingDateTimeUTCOffset"),
when($"ThirdPartySourceCode_1".isNotNull, $"ThirdPartySourceCode_1").otherwise($"ThirdPartySourceCode").as("ThirdPartySourceCode"),
when($"ThirdPartySourcePriority_1".isNotNull, $"ThirdPartySourcePriority_1").otherwise($"ThirdPartySourcePriority").as("ThirdPartySourcePriority"),
when($"SourceTypeId_1".isNotNull, $"SourceTypeId_1").otherwise($"SourceTypeId").as("SourceTypeId"),
when($"ThirdPartySourceCodeId_1".isNotNull, $"ThirdPartySourceCodeId_1").otherwise($"ThirdPartySourceCodeId").as("ThirdPartySourceCodeId"),
when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"),
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"))
.filter(!$"FFAction".contains("D"))
val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))
val headerColumn = df.columns.filter(v => (!v.contains("^") && !v.contains("_c"))).toSeq
val header = headerColumn.dropRight(1).mkString("", "|^|", "|!|")
val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header)
dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition")
.format("csv")
.option("nullValue", "")
.option("delimiter", ";")
.option("header", "true")
.option("codec", "gzip")
.save("s3://trfsmallfffile/FinancialSource/output")
Upvotes: 0
Views: 93
Reputation: 41957
Your both columns, ThirdPartySourceCodeId
and ThirdPartySourcePriority
, after join are of integerType
, so na.fill("")
didn't work for them and thus when you used concat
, all null
integer
values got filtered
out.
the solution to your problem is to cast the two columns into stringType
before applying na.fill
so changing
val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))
to
val dfMainOutputFinal = dfMainOutput.withColumn("ThirdPartySourcePriority", $"ThirdPartySourcePriority".cast(StringType)).withColumn("ThirdPartySourceCodeId", $"ThirdPartySourceCodeId".cast(StringType)).na.fill("").select($"DataPartition",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))
should solve your issue
Upvotes: 1