Reputation: 167
I have a vast amount of raw delta tables in Databricks (bronze), and I want to clean them up by only selecting a few columns and renaming them appropriately (before saving them to a new database; silver).
For example:
df.select(col("CUSTOMERNUMBER").alias('CustomerNumber'),
col("NAME1").alias('CustomerName'),
col("LANGUAGE").alias('Language'),
col("TRANSACTIONTIMESTAMP").alias('TransactionTimestamp'))
However, I want to create a function that takes in a df and mapping and does the select dynamically.
Something along the lines like this:
mapCustomer = {'CUSTOMERNUMBER' : 'CustomerNumber',
'NAME1': 'CustomerName',
'LANGUAGE' : 'Language',
'TRANSACTIONTIMESTAMP': 'TransactionTimestamp'}
def map_col(df, mapping):
return df.select(mapping)
mapped_df = map_col(df, mapCustomer)
print(mapped_df)
# save df to new location (Silver layer)
# mapped_df.write.format('delta')....
How can one achieve this?
I am getting an error:
Invalid argument, not a string or column: {'CUSTOMERNUMBER': 'CustomerNumber', 'NAME1': 'CustomerName', 'LANGUAGE': 'Language', 'TRANSACTIONTIMESTAMP': 'TransactionTimestamp'} of type <class 'dict'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
Upvotes: 0
Views: 1835
Reputation: 167
I found a solution that seems to work:
def filter_data(df, mapping):
mylist = []
for key in mapping:
mykey = key
value = mapping[key]
mylist.append(col(key).alias(value))
return df.select(mylist)
mapCustomer = {'CUSTOMERNUMBER' : 'CustomerNumber', 'NAME1': 'CustomerName', 'LANGUAGE' : 'Language', 'TRANSACTIONTIMESTAMP': 'TransactionTimestamp'}
filtered = filter_data(added_col, mapCustomer)
display(filtered)
Upvotes: 1