bruce szalwinski
bruce szalwinski

Reputation: 752

Issue with DataFrame na() fill methods and ambiguous reference

I'm using Spark 1.3.1 where joining two dataframes repeats the column(s) being joined. I'm left outer joining two data frames and want to send the resulting dataframe to the na().fill() method to convert nulls to known values based on the data type of the column. I've built a map of "table.column" -> "value" and pass that to the fill method. But I get exception instead of success :(. What are my options? I see that there is a dataFrame.withColumnRenamed method but I can only rename one column. I have joins that involve more than one column. Do I just have to ensure that there is a unique set of column names, regardless of table aliases in the dataFrame where I apply the na().fill() method?

Given:

scala> val df1 = sqlContext.jsonFile("people.json").as("df1")
df1: org.apache.spark.sql.DataFrame = [first: string, last: string]

scala> val df2 = sqlContext.jsonFile("people.json").as("df2")
df2: org.apache.spark.sql.DataFrame = [first: string, last: string]

I can join them together with

val df3 = df1.join(df2, df1("first") === df2("first"), "left_outer")

And I have a map that converts data type to value.

scala> val map = Map("df1.first"->"unknown", "df1.last" -> "unknown",
"df2.first" -> "unknown", "df2.last" -> "unknown")

But executing fill(map) results in exception.

scala> df3.na.fill(map)

org.apache.spark.sql.AnalysisException: Reference 'first' is ambiguous,
could be: first#6, first#8.;

Upvotes: 3

Views: 2005

Answers (1)

bruce szalwinski
bruce szalwinski

Reputation: 752

Here is what I came up with. In my original example, there is nothing interesting left in df2 after the join, so I changed this to be classical department / employee example.

department.json

{"department": 2, "name":"accounting"}
{"department": 1, "name":"engineering"}

person.json

{"department": 1, "first":"Bruce", "last": "szalwinski"}

And now I can join the dataframes, build the map, and replace nulls with unknowns.

scala> val df1 = sqlContext.jsonFile("department.json").as("df1")
df1: org.apache.spark.sql.DataFrame = [department: bigint, name: string]

scala> val df2 = sqlContext.jsonFile("people.json").as("df2")
df2: org.apache.spark.sql.DataFrame = [department: bigint, first: string, last: string]

scala> val df3 = df1.join(df2, df1("department") === df2("department"), "left_outer")
df3: org.apache.spark.sql.DataFrame = [department: bigint, name: string, department: bigint, first: string, last: string]

scala> val map = Map("first" -> "unknown", "last" -> "unknown")
map: scala.collection.immutable.Map[String,String] = Map(first -> unknown, last -> unknown)

scala> val df4 = df3.select("df1.department", "df2.first", "df2.last").na.fill(map)
df4: org.apache.spark.sql.DataFrame = [department: bigint, first: string, last: string]

scala> df4.show()
+----------+-------+----------+
|department|  first|      last|
+----------+-------+----------+
|         2|unknown|   unknown|
|         1|  Bruce|szalwinski|
+----------+-------+----------+

Upvotes: 3

Related Questions