Reputation: 5917
Hi I have 2 dataframes to join
#df1
name genre count
satya drama 1
satya action 3
abc drame 2
abc comedy 2
def romance 1
#df2
name max_count
satya 3
abc 2
def 1
Now I want to join above 2 dfs on name and count==max_count, But i am getting an error
import pyspark.sql.functions as F
from pyspark.sql.functions import count, col
from pyspark.sql.functions import struct
df = spark.read.csv('file',sep = '###', header=True)
df1 = df.groupBy("name", "genre").count()
df2 = df1.groupby('name').agg(F.max("count").alias("max_count"))
#Now trying to join both dataframes
final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count))
final_df.show() ###Error
#py4j.protocol.Py4JJavaError: An error occurred while calling o207.showString.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
#Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: count(1)
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)
But success with "left " join
final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count), "left")
final_df.show() ###Success but i don't want left join , i want inner join
My question is why the above one fails, am I doing something wrong there???
I referred this link "Find maximum row per group in Spark DataFrame". Used the first answer (2 groupby method).But same error.
I am on spark-2.0.0-bin-hadoop2.7 and python 2.7.
Please suggest.Thanks.
The above scenario works with spark 1.6 (which is quite surprising that what's wrong with spark 2.0 (or with my installation , I will reinstall, check and update here)).
Has anybody tried this on spark 2.0 and got success , by following Yaron's answer below???
Upvotes: 0
Views: 10109
Reputation: 31
I ran into the same problem when I tried to join two DataFrames where one of them was GroupedData. It worked for me when I cached the GroupedData DataFrame before the inner join. For your code, try:
df1 = df.groupBy("name", "genre").count().cache() # added cache()
df2 = df1.groupby('name').agg(F.max("count").alias("max_count")).cache() # added cache()
final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count)) # no change
Upvotes: 3
Reputation: 5917
I created a single column('combined') from columns in join comparision('name','mycount')in respective dfs, so now I have one column to compare and this is not creating any issue as I am comparing only one column.
def combine_func(*args):
data = '_'.join([str(x) for x in args]) ###converting nonstring to str tehn concatenation
return data
combine_func = udf(combine_func, StringType()) ##register the func as udf
df1 = df1.withColumn('combined_new_1', combine_new(df1['name'],df1['mycount'])) ###a col having concatenated value from name and mycount columns eg: 'satya_3'
df2 = df2.withColumn('combined_new_2', combine_new(df2['name2'],df2['max_count']))
#df1.columns == 'name','genre', 'mycount', 'combined_new_1'
#df2.columns == 'name2', 'max_count', 'combined_new_2'
#Now join
final_df = df1.join(df2,df1.combined_new_1 == df2.combined_new_2, 'inner')
#final_df = df1.join(df2,df1.combined_new_1 == df2.combined_new_2, 'inner').select('the columns you want')
final_df.show() ####It is showing the result, Trust me.
Please don't follow until unless you are in a hurry, Better search for a reliable solution.
Upvotes: 0
Reputation: 10450
Update: It seems like your code was failing also due to the use of "count" as column name. count seems to be protected keyword in DataFrame API. renaming count to "mycount" solved the problem. The below working code was modify to support spark version 1.5.2 which I used to test your issue.
df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/tmp/fac_cal.csv")
df1 = df.groupBy("name", "genre").count()
df1 = df1.select(col("name"),col("genre"),col("count").alias("mycount"))
df2 = df1.groupby('name').agg(F.max("mycount").alias("max_count"))
df2 = df2.select(col('name').alias('name2'),col("max_count"))
#Now trying to join both dataframes
final_df = df1.join(df2,[df1.name == df2.name2 , df1.mycount == df2.max_count])
final_df.show()
+-----+---------+-------+-----+---------+
| name| genre|mycount|name2|max_count|
+-----+---------+-------+-----+---------+
|brata| comedy| 2|brata| 2|
|brata| drama| 2|brata| 2|
|panda|adventure| 1|panda| 1|
|panda| romance| 1|panda| 1|
|satya| action| 3|satya| 3|
+-----+---------+-------+-----+---------+
The example for complex condition in https://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html
cond = [df.name == df3.name, df.age == df3.age]
>>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect()
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
can you try:
final_df = df1.join(df2, [df1.name == df2.name , df1.mycount == df2.max_count])
Note also, that according to the spec "left" is not part of the valid join types: how – str, default ‘inner’. One of inner, outer, left_outer, right_outer, leftsemi.
Upvotes: 2