Reputation: 752
I'm using Spark 1.3.1 where joining two dataframes repeats the column(s) being
joined. I'm left outer joining two data frames and want to send the
resulting dataframe to the na().fill()
method to convert nulls to known
values based on the data type of the column. I've built a map of
"table.column" -> "value" and pass that to the fill method. But I get
exception instead of success :(. What are my options? I see that there is a dataFrame.withColumnRenamed method but I can only rename one column. I have joins that involve more than one column. Do I just have to ensure that there is a unique set of column names, regardless of table aliases in the dataFrame where I apply the na().fill() method?
Given:
scala> val df1 = sqlContext.jsonFile("people.json").as("df1")
df1: org.apache.spark.sql.DataFrame = [first: string, last: string]
scala> val df2 = sqlContext.jsonFile("people.json").as("df2")
df2: org.apache.spark.sql.DataFrame = [first: string, last: string]
I can join them together with
val df3 = df1.join(df2, df1("first") === df2("first"), "left_outer")
And I have a map that converts data type to value.
scala> val map = Map("df1.first"->"unknown", "df1.last" -> "unknown",
"df2.first" -> "unknown", "df2.last" -> "unknown")
But executing fill(map) results in exception.
scala> df3.na.fill(map)
org.apache.spark.sql.AnalysisException: Reference 'first' is ambiguous,
could be: first#6, first#8.;
Upvotes: 3
Views: 2005
Reputation: 752
Here is what I came up with. In my original example, there is nothing interesting left in df2 after the join, so I changed this to be classical department / employee example.
department.json
{"department": 2, "name":"accounting"}
{"department": 1, "name":"engineering"}
person.json
{"department": 1, "first":"Bruce", "last": "szalwinski"}
And now I can join the dataframes, build the map, and replace nulls with unknowns.
scala> val df1 = sqlContext.jsonFile("department.json").as("df1")
df1: org.apache.spark.sql.DataFrame = [department: bigint, name: string]
scala> val df2 = sqlContext.jsonFile("people.json").as("df2")
df2: org.apache.spark.sql.DataFrame = [department: bigint, first: string, last: string]
scala> val df3 = df1.join(df2, df1("department") === df2("department"), "left_outer")
df3: org.apache.spark.sql.DataFrame = [department: bigint, name: string, department: bigint, first: string, last: string]
scala> val map = Map("first" -> "unknown", "last" -> "unknown")
map: scala.collection.immutable.Map[String,String] = Map(first -> unknown, last -> unknown)
scala> val df4 = df3.select("df1.department", "df2.first", "df2.last").na.fill(map)
df4: org.apache.spark.sql.DataFrame = [department: bigint, first: string, last: string]
scala> df4.show()
+----------+-------+----------+
|department| first| last|
+----------+-------+----------+
| 2|unknown| unknown|
| 1| Bruce|szalwinski|
+----------+-------+----------+
Upvotes: 3