Reputation: 309
Pardon my ignorance, I am new to pyspark. I'm trying to improve a udf to create a new column count_adj
based on values from another column a_type
using a dictionary. How do I account for None / Null types in this process to create my new column. This is super easy in pandas (df['adj_count'] = df.a_type.map(count_map)
) but struggling do this in pyspark.
Sample data / imports:
# all imports used -- not just for this portion of the script
from pyspark.sql import SparkSession, HiveContext, SQLContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import sql
import pyspark.sql.functions as F
import random
from pyspark.sql.functions import lit
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from datetime import datetime
from datetime import date
from datetime import timedelta
from pyspark.sql import Window
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import rank, row_number, max as max_, col
import sys
import os
spark = SparkSession.builder.appName('a_type_tests').getOrCreate()
# note: sample data has results from the original udf for comparison
dataDictionary = [
(26551, 491, '2022-01-22', '740', -1, 16),
(24192, 338, '2022-01-22', '740', -1, 16),
(26555, 3013, '2022-01-22', '740', -1, 16),
(26571, 937, '2022-01-22', '740', -1, 16),
(24376, 371, '2022-01-22', '740', -1, 16),
(17716, 118, '2022-01-22', '740', -1, 16),
(26554, 3013, '2022-01-22', '740', -1, 16),
(26734, 105, '2022-01-22', '740', -1, 16),
(26051, 415, '2022-01-22', '600', -1, 8),
(26602, 501, '2022-01-22', '740', -1, 16),
(26125, 501, '2022-01-22', None, -1, 0)
]
sdf = spark.createDataFrame(data=dataDictionary, schema = ['id', 'loc_id', 'a_date', 'a_type', 'adj_val', 'udf_original'])
sdf.printSchema()
sdf.show(truncate=False)
The original udf is similar to:
def count_adj(a_type):
if a_type is None:
return 0
elif a_type in ('703','704','705','708','900','910'):
return 4
elif a_type in ('701','702'):
return 2
elif a_type in ('711','712'):
return 1
elif a_type in ('600', '704'):
return 8
elif a_type in ('740'):
return 16
elif a_type in ('305','306'):
return 32
elif a_type in ('601','612','615'):
return 64
else:
return 128
I've created a dictionary to correspond to these values.
# remove 0:None type pairing because None is not iterable to invert dict
count_map = {1:['711','712'], \
2:['701','702'], \
4:['703','704','705','708','900','910'], \
8:['600', '704'], \
16:['740'], \
32:['305','306'], \
64:['601','612','615'], \
128: ['1600', '1601', '1602']
}
# invert dict
count_map = {c:key for key, vals in count_map.items() for c in vals}
# create None mapping manually
count_map[None] = 0
Searching SO I came across this which resulted in in the following error:
# Code Tried:
# Changes None type to NULL -- fine but how do I account for these None/Null Values in my dict?
map_expr = F.create_map([lit(x) for x in chain(*count_map.items())])
sdf2 = sdf.withColumn('count_adj', map_expr.getItem('a_type'))
# or:
sdf2 = sdf.withColumn('count_adj',map_expr[col('a_type')]).show()
# Error
Py4JJavaError: An error occurred while calling o334.showString.
: java.lang.RuntimeException: Cannot use null as map key.
How do I account for None / NULL types when using a dictionary to create a new column based on values from another column? Is it possible to include a NULL check in my map expression or something else entirely?
Upvotes: 1
Views: 1893
Reputation: 309
For completeness, I removed the None
type from the dictionary and utilized a method similar to Karthik's answer and a combo of other SO posts mentioned in the question.
My final solution relied on the code below and using .when()
and .isNull()
to account for None
/ NULL
conversions.
# Original Mapping
# remove 0:None type pairing because None is not iterable to invert dict
count_map = {1:['711','712'], \
2:['701','702'], \
4:['703','704','705','708','900','910'], \
8:['600', '704'], \
16:['740'], \
32:['305','306'], \
64:['601','612','615'], \
128: ['1600', '1601', '1602']
}
# invert dict
count_map = {c:key for key, vals in count_map.items() for c in vals}
# New below:
map_expr = F.create_map([lit(x) for x in chain(*count_map.items())])
sdf2 = sdf.withColumn('count_adj', F.when( col('a_type').isNull(), 0 ).otherwise( map_expr.getItem(col('a_type') ) ) )
Upvotes: 1
Reputation: 1171
The key columns must all have the same data type, and can't be null. The key of the map won’t accept None/Null values.
Instead of above code, you can use when function, which gives your desired output as shown below:
newDF = sdf.withColumn("count_adj",F.when(F.col("a_type").isNull(),0)\
.when(F.col("a_type").isin('711','712'),1)\
.when(F.col("a_type").isin('701','702'),2)\
.when(F.col("a_type").isin('703','704','705','708','900','910'),4)\
.when(F.col("a_type").isin('600', '704'),8)\
.when(F.col("a_type").isin('740'),16)\
.when(F.col("a_type").isin('305','306'),32)\
.when(F.col("a_type").isin('601','612','615'),64)\
.otherwise(128))
Upvotes: 0