Paco
Paco

Reputation: 415

PySpark: Convert Map Column Keys Using Dictionary

I have a PySpark DataFrame with a map column as below:

root
 |-- id: long (nullable = true)
 |-- map_col: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)

The map_col has keys which need to be converted based on a dictionary. For example, the dictionary might be:

mapping = {'a': '1', 'b': '2', 'c': '5', 'd': '8' }

So, the DataFrame needs to change from:

[Row(id=123, map_col={'a': 0.0, 'b': -42.19}),
  Row(id=456, map_col={'a': 13.25, 'c': -19.6, 'd': 15.6})]

to the following:

[Row(id=123, map_col={'1': 0.0, '2': -42.19}),
  Row(id=456, map_col={'1': 13.25, '5': -19.6, '8': 15.6})]

I see that transform_keys is an option if I could write-out the dictionary, but it's too large and dynamically-generated earlier in the workflow. I think an explode/pivot could also work, but seems non-performant?

Any ideas?

Edit: Added a bit to show that size of map in map_col is not uniform.

Upvotes: 1

Views: 1896

Answers (3)

wwnde
wwnde

Reputation: 26676

Another way:

Use itertools to create an expression to inject into pysparks transform_keys function. Used upper just in case. Code below

   from itertools import chain

m_expr1 = create_map([lit(x) for x in chain(*mapping.items())])


new =df.withColumn('new_map_col',transform_keys("map_col", lambda k, _: upper(m_expr1[k])))

new.show(truncate=False)

+---+-----------------------------------+-----------------------------------+
|id |map_col                            |new_map_col                        |
+---+-----------------------------------+-----------------------------------+
|123|{a -> 0.0, b -> -42.19}            |{1 -> 0.0, 2 -> -42.19}            |
|456|{a -> 13.25, c -> -19.6, d -> 15.6}|{1 -> 13.25, 5 -> -19.6, 8 -> 15.6}|
+---+-----------------------------------+-----------------------------------+

Upvotes: 1

samkart
samkart

Reputation: 6654

an approach using RDD transformation.

def updateKey(theDict, mapDict):
    """
    update theDict's key using mapDict
    """

    updDict = []
    for item in theDict.items():
        updDict.append((mapDict[item[0]] if item[0] in mapDict.keys() else item[0], item[1]))
    
    return dict(updDict)

data_sdf.rdd. \
    map(lambda r: (r[0], r[1], updateKey(r[1], mapping))). \
    toDF(['id', 'map_col', 'new_map_col']). \
    show(truncate=False)

# +---+-----------------------------------+-----------------------------------+
# |id |map_col                            |new_map_col                        |
# +---+-----------------------------------+-----------------------------------+
# |123|{a -> 0.0, b -> -42.19, e -> 12.12}|{1 -> 0.0, 2 -> -42.19, e -> 12.12}|
# |456|{a -> 13.25, c -> -19.6, d -> 15.6}|{8 -> 15.6, 1 -> 13.25, 5 -> -19.6}|
# +---+-----------------------------------+-----------------------------------+

P.S., I added a new key within the map_col's first row to show what happens if no mapping is available

Upvotes: 2

danielcahall
danielcahall

Reputation: 2752

transform_keys can use a lambda, as shown in the example, it's not just limited to an expr. However, the lambda or Python callable will need to utilize a function either defined in pyspark.sql.functions, a Column method, or a Scala UDF, so using a Python UDF which refers to the mapping dictionary object isn't currently possible with this mechanism. However, we can make use of the when function to apply the mapping, by unrolling the key-value pairs in the mapping into chained when conditions. See the below example to illustrate the idea:

from typing import Dict, Callable
from functools import reduce

from pyspark.sql.functions import Column, when, transform_keys
from pyspark.sql import SparkSession

def apply_mapping(mapping: Dict[str, str]) -> Callable[[Column, Column], Column]:

    def convert_mapping_into_when_conditions(key: Column, _: Column) -> Column:
        initial_key, initial_value = mapping.popitem()
        initial_condition = when(key == initial_key, initial_value)
        return reduce(lambda x, y: x.when(key == y[0], y[1]), mapping.items(), initial_condition)

    return convert_mapping_into_when_conditions


if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("Temp")\
        .getOrCreate()
    df = spark.createDataFrame([(1, {"foo": -2.0, "bar": 2.0})], ("id", "data"))
    mapping = {'foo': 'a', 'bar': 'b'}
    df.select(transform_keys(
        "data", apply_mapping(mapping)).alias("data_transformed")
              ).show(truncate=False)

The output of the above is:

+---------------------+
|data_transformed     |
+---------------------+
|{b -> 2.0, a -> -2.0}|
+---------------------+

which demonstrates the defined mapping (foo -> a, bar -> b) was successfully applied to the column. The apply_mapping function should be generic enough to copy and utilize in your own pipeline.

Upvotes: 1

Related Questions