tlk27
tlk27

Reputation: 309

Pyspark NULL mapping key

Pardon my ignorance, I am new to pyspark. I'm trying to improve a udf to create a new column count_adj based on values from another column a_type using a dictionary. How do I account for None / Null types in this process to create my new column. This is super easy in pandas (df['adj_count'] = df.a_type.map(count_map)) but struggling do this in pyspark.

Sample data / imports:

# all imports used -- not just for this portion of the script
from pyspark.sql import SparkSession, HiveContext, SQLContext
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import sql
import pyspark.sql.functions as F
import random
from pyspark.sql.functions import lit
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from datetime import datetime
from datetime import date
from datetime import timedelta
from pyspark.sql import Window
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import rank, row_number, max as max_, col
import sys
import os

spark = SparkSession.builder.appName('a_type_tests').getOrCreate()

# note: sample data has results from the original udf for comparison
dataDictionary = [
(26551, 491, '2022-01-22', '740', -1, 16),
(24192, 338, '2022-01-22', '740', -1, 16),
(26555, 3013, '2022-01-22', '740', -1, 16),
(26571, 937, '2022-01-22', '740', -1, 16),
(24376, 371, '2022-01-22', '740', -1, 16),
(17716, 118, '2022-01-22', '740', -1, 16),
(26554, 3013, '2022-01-22', '740', -1, 16),
(26734, 105, '2022-01-22', '740', -1, 16),
(26051, 415, '2022-01-22', '600', -1, 8),
(26602, 501, '2022-01-22', '740', -1, 16),
(26125, 501, '2022-01-22', None, -1, 0)
        ]

sdf = spark.createDataFrame(data=dataDictionary, schema = ['id', 'loc_id', 'a_date', 'a_type', 'adj_val', 'udf_original'])
sdf.printSchema()
sdf.show(truncate=False)

The original udf is similar to:

def count_adj(a_type):
    if a_type is None:
        return 0
    elif a_type in ('703','704','705','708','900','910'):
        return 4
    elif a_type in ('701','702'):
        return 2
    elif a_type in ('711','712'):
        return 1
    elif a_type in ('600', '704'):
        return 8
    elif a_type in ('740'):
        return 16
    elif a_type in ('305','306'):
        return 32
    elif a_type in ('601','612','615'):
        return 64
    else:
        return 128

I've created a dictionary to correspond to these values.

# remove 0:None type pairing because None is not iterable to invert dict
count_map = {1:['711','712'], \
             2:['701','702'], \
             4:['703','704','705','708','900','910'], \
             8:['600', '704'], \
            16:['740'], \
            32:['305','306'], \
            64:['601','612','615'], \
           128: ['1600', '1601', '1602']
            }

# invert dict
count_map = {c:key for key, vals in count_map.items() for c in vals}

# create None mapping manually
count_map[None] = 0

Searching SO I came across this which resulted in in the following error:

# Code Tried:

# Changes None type to NULL -- fine but how do I account for these None/Null Values in my dict?
map_expr = F.create_map([lit(x) for x in chain(*count_map.items())])

sdf2 = sdf.withColumn('count_adj', map_expr.getItem('a_type'))

# or:

sdf2 = sdf.withColumn('count_adj',map_expr[col('a_type')]).show()
# Error

Py4JJavaError: An error occurred while calling o334.showString.
: java.lang.RuntimeException: Cannot use null as map key.

How do I account for None / NULL types when using a dictionary to create a new column based on values from another column? Is it possible to include a NULL check in my map expression or something else entirely?

Upvotes: 1

Views: 1893

Answers (2)

tlk27
tlk27

Reputation: 309

For completeness, I removed the None type from the dictionary and utilized a method similar to Karthik's answer and a combo of other SO posts mentioned in the question.

My final solution relied on the code below and using .when() and .isNull() to account for None / NULL conversions.

# Original Mapping
# remove 0:None type pairing because None is not iterable to invert dict
count_map = {1:['711','712'], \
             2:['701','702'], \
             4:['703','704','705','708','900','910'], \
             8:['600', '704'], \
            16:['740'], \
            32:['305','306'], \
            64:['601','612','615'], \
           128: ['1600', '1601', '1602']
            }

# invert dict
count_map = {c:key for key, vals in count_map.items() for c in vals}

# New below:
map_expr = F.create_map([lit(x) for x in chain(*count_map.items())])

sdf2 = sdf.withColumn('count_adj', F.when( col('a_type').isNull(), 0 ).otherwise( map_expr.getItem(col('a_type') ) ) )

Upvotes: 1

Karthik
Karthik

Reputation: 1171

The key columns must all have the same data type, and can't be null. The key of the map won’t accept None/Null values.

Instead of above code, you can use when function, which gives your desired output as shown below:

newDF = sdf.withColumn("count_adj",F.when(F.col("a_type").isNull(),0)\
       .when(F.col("a_type").isin('711','712'),1)\
       .when(F.col("a_type").isin('701','702'),2)\
       .when(F.col("a_type").isin('703','704','705','708','900','910'),4)\
       .when(F.col("a_type").isin('600', '704'),8)\
       .when(F.col("a_type").isin('740'),16)\
       .when(F.col("a_type").isin('305','306'),32)\
       .when(F.col("a_type").isin('601','612','615'),64)\
       .otherwise(128))

enter image description here

Upvotes: 0

Related Questions