Reputation: 2182
I am trying to create new columns in a Spark SQL dataframe that compare two columns within the dataframe, and return True if they are equal and False otherwise. I have to do this for a datset with thousands of columns. To be a sample problem, I've included all of my code here. However, the important problem comes in the second for
loop at the end of the code bloc.
from pyspark.sql import SQLContext
from pyspark.sql.types import *
data = sc.parallelize([[1, None, 'BND'], [2, None, 'RMP'], [3, None, 'SWP'], [4, None, "IRS"], [5, None, "SWP"], [6, None, "IRS"]])
match = sc.parallelize([[1, 2, 100], [3, 5, 101], [4, 6, 102]])
trade_schema_string = 'trade_id,match_id,product'
trade_fields = [StructField(field_name, StringType(), True) for field_name in trade_schema_string.split(',')]
trade_fields[0].dataType = IntegerType()
trade_fields[1].dataType = IntegerType()
trade_schema = StructType(trade_fields)
match_schema_string = "pri_netting_id,sec_netting_id,match_id"
match_fields = [StructField(field_name, IntegerType(), True) for field_name in match_schema_string.split(',')]
match_schema = StructType(match_fields)
sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(data, trade_schema)
odf = sqlContext.createDataFrame(match, match_schema)
df.registerTempTable("trade")
odf.registerTempTable("match")
# Get match_ids so you can match up front office and back office records
# Change column names for fo and bo dataframes so that they say "bo_product" and "fo_product", etc.
fo = sqlContext.sql("SELECT t.trade_id,t.product,m.match_id FROM trade t INNER JOIN match m WHERE t.trade_id = m.pri_netting_id")
bo = sqlContext.sql("SELECT t.trade_id,t.product,m.match_id FROM trade t INNER JOIN match m WHERE t.trade_id = m.sec_netting_id")
col_names = fo.columns
for x in range(0, len(col_names)):
col_name = col_names[x]
fo = fo.withColumnRenamed(col_name, "fo_" + col_name)
bo = bo.withColumnRenamed(col_name, "bo_" + col_name)
fo.registerTempTable("front_office")
bo.registerTempTable("back_office")
fobo = sqlContext.sql("SELECT f.fo_trade_id,f.fo_product,b.bo_trade_id,b.bo_product FROM front_office f INNER JOIN back_office b WHERE f.fo_match_id = b.bo_match_id")
fobo = fobo.repartion(5)
# How to create diff columns
num_cols = len(fobo.columns)
fobo_names = fobo.columns
first = fobo.first()
for x in range(0, num_cols / 2):
new_name = "\'diff_" + fobo_names[x][3:] + "\'"
old_column_fo = "fobo." + fobo_names[x]
old_column_bo = "fobo." + fobo_names[x + (num_cols / 2)]
fobo = fobo.withColumn(new_name, old_column_fo == old_column_bo)
The error I get is:
Traceback (most recent call last): File "", line 8, in File "/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/spark/python/pyspark/sql/dataframe.py", line 695, in withColumn return self.select('*', col.alias(colName)) AttributeError: 'bool' object has no attribute 'alias'
So, the strange thing is that if I execute the following by hand:
fobo = fobo.withColumn("diff_product", fobo.fo_product == fobo.bo_product)
and
fobo = fobo.withColumn("diff_trade_id", fobo.fo_trade_id == fobo.bo_trade_id)
The whole thing works perfectly. However, this isn't practical for my true use case, which has many columns.
Upvotes: 0
Views: 760
Reputation: 76184
old_column_fo = "fobo." + fobo_names[x]
old_column_bo = "fobo." + fobo_names[x + (num_cols / 2)]
fobo = fobo.withColumn(new_name, old_column_fo == old_column_bo)
old_column_fo
and old_column_bo
will be strings that merely look like the attribute names you're trying to access, but they won't be the actual attributes. Try using getattr
instead.
old_column_fo = getattr(fobo, fobo_names[x])
old_column_bo = getattr(fobo, fobo_names[x + (num_cols / 2)])
fobo = fobo.withColumn(new_name, old_column_fo == old_column_bo)
Upvotes: 2