Nate Reed
Nate Reed

Reputation: 7011

How to reference an aliased aggregate function applied to a column in a join condition

I'm applying an aggregate function (max) to a column, which I'm then referencing in a join.

The column becomes max(column_name) in the data frame. So, to make it easier to reference using Python's dot notation, I aliased the column, but I'm still getting an error:

tmp = hiveContext.sql("SELECT * FROM s3_data.nate_glossary WHERE profile_id_guid='ffaff64b-e87c-4a43-b593-b0e4bccc2731'"
                  )

max_processed = tmp.groupby('profile_id_guid','profile_version_id','record_type','scope','item_id','attribute_key') \
.agg(max("processed_date").alias("max_processed_date"))

df = max_processed.join(tmp, [max_processed.profile_id_guid == tmp.profile_id_guid,
                          max_processed.profile_version_id == tmp.profile_version_id,
                          max_processed.record_type == tmp.record_type,
                          max_processed.scope == tmp.scope,
                          max_processed.item_id == tmp.item_id,
                          max_processed.attribute_key == tmp.attribute_key,
                          max_processed.max_processed_date == tmp.processed_date])

The error:

File "", line 7, in File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/dataframe.py", line 650, in join jdf = self._jdf.join(other._jdf, on._jc, "inner") File "/usr/hdp/2.5.0.0-1245/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u'resolved attribute(s) processed_date#10 missing from record_type#41,scope#4,item_id#5,profile_id_guid#1,data_type#44,attribute_value#47,logical_id#45,profile_version_id#40,profile_version_id#2,attribute_key#8,max_processed_date#37,attribute_key#46,processed_date#48,scope#42,record_type#3,item_id#43,profile_id_guid#39,ems_system_id#38 in operator !Join Inner, Some((((((((profile_id_guid#1 = profile_id_guid#1) && (profile_version_id#2 = profile_version_id#2)) && (record_type#3 = record_type#3)) && (scope#4 = scope#4)) && (item_id#5 = item_id#5)) && (attribute_key#8 = attribute_key#8)) && (max_processed_date#37 = processed_date#10)));'

Note the error message: "processed_date#10 missing". I see processed_date#48 and processed_date#10 in the list of attributes.

Upvotes: 0

Views: 724

Answers (1)

Zhang Tong
Zhang Tong

Reputation: 4719

See:

# DataFrame transformation
tmp -> max_processed -> df

The above three DataFrame share the same lineage, so if you want to use same column more than once, you need to use alias.

For example:

tmp = spark.createDataFrame([(1, 3, 1), (1, 3, 0), (2, 3, 1)], ['key1', 'key2', 'val'])

max_processed = tmp.groupBy(['key1', 'key2']).agg(f.max(tmp['val']).alias('max_val'))\
    .withColumnRenamed('key1', 'max_key1').withColumnRenamed('key2', 'max_key2')\

df = max_processed.join(tmp, on=[max_processed['max_key1'] == tmp['key1'],
                                 max_processed['max_key2'] == tmp['key2'],
                                 max_processed['max_val'] == tmp['val']])
df.show()

+--------+--------+-------+----+----+---+                                       
|max_key1|max_key2|max_val|key1|key2|val|
+--------+--------+-------+----+----+---+
|       1|       3|      1|   1|   3|  1|
|       2|       3|      1|   2|   3|  1|
+--------+--------+-------+----+----+---+

I still think this is a defect in spark lineage, to be honest

Upvotes: 1

Related Questions