Reputation: 415
I have a PySpark DataFrame with a map column as below:
root
|-- id: long (nullable = true)
|-- map_col: map (nullable = true)
| |-- key: string
| |-- value: double (valueContainsNull = true)
The map_col
has keys which need to be converted based on a dictionary. For example, the dictionary might be:
mapping = {'a': '1', 'b': '2', 'c': '5', 'd': '8' }
So, the DataFrame
needs to change from:
[Row(id=123, map_col={'a': 0.0, 'b': -42.19}),
Row(id=456, map_col={'a': 13.25, 'c': -19.6, 'd': 15.6})]
to the following:
[Row(id=123, map_col={'1': 0.0, '2': -42.19}),
Row(id=456, map_col={'1': 13.25, '5': -19.6, '8': 15.6})]
I see that transform_keys
is an option if I could write-out the dictionary, but it's too large and dynamically-generated earlier in the workflow. I think an explode
/pivot
could also work, but seems non-performant?
Any ideas?
Edit: Added a bit to show that size of map
in map_col
is not uniform.
Upvotes: 1
Views: 1896
Reputation: 26676
Another way:
Use itertools to create an expression to inject into pysparks transform_keys function. Used upper just in case. Code below
from itertools import chain
m_expr1 = create_map([lit(x) for x in chain(*mapping.items())])
new =df.withColumn('new_map_col',transform_keys("map_col", lambda k, _: upper(m_expr1[k])))
new.show(truncate=False)
+---+-----------------------------------+-----------------------------------+
|id |map_col |new_map_col |
+---+-----------------------------------+-----------------------------------+
|123|{a -> 0.0, b -> -42.19} |{1 -> 0.0, 2 -> -42.19} |
|456|{a -> 13.25, c -> -19.6, d -> 15.6}|{1 -> 13.25, 5 -> -19.6, 8 -> 15.6}|
+---+-----------------------------------+-----------------------------------+
Upvotes: 1
Reputation: 6654
an approach using RDD transformation.
def updateKey(theDict, mapDict):
"""
update theDict's key using mapDict
"""
updDict = []
for item in theDict.items():
updDict.append((mapDict[item[0]] if item[0] in mapDict.keys() else item[0], item[1]))
return dict(updDict)
data_sdf.rdd. \
map(lambda r: (r[0], r[1], updateKey(r[1], mapping))). \
toDF(['id', 'map_col', 'new_map_col']). \
show(truncate=False)
# +---+-----------------------------------+-----------------------------------+
# |id |map_col |new_map_col |
# +---+-----------------------------------+-----------------------------------+
# |123|{a -> 0.0, b -> -42.19, e -> 12.12}|{1 -> 0.0, 2 -> -42.19, e -> 12.12}|
# |456|{a -> 13.25, c -> -19.6, d -> 15.6}|{8 -> 15.6, 1 -> 13.25, 5 -> -19.6}|
# +---+-----------------------------------+-----------------------------------+
P.S., I added a new key within the map_col's first row to show what happens if no mapping is available
Upvotes: 2
Reputation: 2752
transform_keys
can use a lambda
, as shown in the example, it's not just limited to an expr
. However, the lambda
or Python callable will need to utilize a function either defined in pyspark.sql.functions
, a Column
method, or a Scala UDF, so using a Python UDF which refers to the mapping
dictionary object isn't currently possible with this mechanism. However, we can make use of the when function to apply the mapping, by unrolling the key-value pairs in the mapping
into chained when
conditions. See the below example to illustrate the idea:
from typing import Dict, Callable
from functools import reduce
from pyspark.sql.functions import Column, when, transform_keys
from pyspark.sql import SparkSession
def apply_mapping(mapping: Dict[str, str]) -> Callable[[Column, Column], Column]:
def convert_mapping_into_when_conditions(key: Column, _: Column) -> Column:
initial_key, initial_value = mapping.popitem()
initial_condition = when(key == initial_key, initial_value)
return reduce(lambda x, y: x.when(key == y[0], y[1]), mapping.items(), initial_condition)
return convert_mapping_into_when_conditions
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("Temp")\
.getOrCreate()
df = spark.createDataFrame([(1, {"foo": -2.0, "bar": 2.0})], ("id", "data"))
mapping = {'foo': 'a', 'bar': 'b'}
df.select(transform_keys(
"data", apply_mapping(mapping)).alias("data_transformed")
).show(truncate=False)
The output of the above is:
+---------------------+
|data_transformed |
+---------------------+
|{b -> 2.0, a -> -2.0}|
+---------------------+
which demonstrates the defined mapping (foo -> a, bar -> b
) was successfully applied to the column. The apply_mapping
function should be generic enough to copy and utilize in your own pipeline.
Upvotes: 1