Reputation: 7011
I'm applying an aggregate function (max) to a column, which I'm then referencing in a join.
The column becomes max(column_name) in the data frame. So, to make it easier to reference using Python's dot notation, I aliased the column, but I'm still getting an error:
tmp = hiveContext.sql("SELECT * FROM s3_data.nate_glossary WHERE profile_id_guid='ffaff64b-e87c-4a43-b593-b0e4bccc2731'"
)
max_processed = tmp.groupby('profile_id_guid','profile_version_id','record_type','scope','item_id','attribute_key') \
.agg(max("processed_date").alias("max_processed_date"))
df = max_processed.join(tmp, [max_processed.profile_id_guid == tmp.profile_id_guid,
max_processed.profile_version_id == tmp.profile_version_id,
max_processed.record_type == tmp.record_type,
max_processed.scope == tmp.scope,
max_processed.item_id == tmp.item_id,
max_processed.attribute_key == tmp.attribute_key,
max_processed.max_processed_date == tmp.processed_date])
The error:
File "", line 7, in File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/dataframe.py", line 650, in join jdf = self._jdf.join(other._jdf, on._jc, "inner") File "/usr/hdp/2.5.0.0-1245/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in call File "/usr/hdp/2.5.0.0-1245/spark/python/pyspark/sql/utils.py", line 51, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u'resolved attribute(s) processed_date#10 missing from record_type#41,scope#4,item_id#5,profile_id_guid#1,data_type#44,attribute_value#47,logical_id#45,profile_version_id#40,profile_version_id#2,attribute_key#8,max_processed_date#37,attribute_key#46,processed_date#48,scope#42,record_type#3,item_id#43,profile_id_guid#39,ems_system_id#38 in operator !Join Inner, Some((((((((profile_id_guid#1 = profile_id_guid#1) && (profile_version_id#2 = profile_version_id#2)) && (record_type#3 = record_type#3)) && (scope#4 = scope#4)) && (item_id#5 = item_id#5)) && (attribute_key#8 = attribute_key#8)) && (max_processed_date#37 = processed_date#10)));'
Note the error message: "processed_date#10 missing". I see processed_date#48 and processed_date#10 in the list of attributes.
Upvotes: 0
Views: 724
Reputation: 4719
See:
# DataFrame transformation
tmp -> max_processed -> df
The above three DataFrame share the same lineage, so if you want to use same column more than once, you need to use alias
.
For example:
tmp = spark.createDataFrame([(1, 3, 1), (1, 3, 0), (2, 3, 1)], ['key1', 'key2', 'val'])
max_processed = tmp.groupBy(['key1', 'key2']).agg(f.max(tmp['val']).alias('max_val'))\
.withColumnRenamed('key1', 'max_key1').withColumnRenamed('key2', 'max_key2')\
df = max_processed.join(tmp, on=[max_processed['max_key1'] == tmp['key1'],
max_processed['max_key2'] == tmp['key2'],
max_processed['max_val'] == tmp['val']])
df.show()
+--------+--------+-------+----+----+---+
|max_key1|max_key2|max_val|key1|key2|val|
+--------+--------+-------+----+----+---+
| 1| 3| 1| 1| 3| 1|
| 2| 3| 1| 2| 3| 1|
+--------+--------+-------+----+----+---+
I still think this is a defect in spark lineage, to be honest
Upvotes: 1