tanyabrown
tanyabrown

Reputation: 29

Map values of a column with ArrayType based on values from another dataframe in PySpark

What I have:

| ids.   |items   |item_id|value|timestamp|
+--------+--------+-------+-----+---------+
|[A,B,C] |1.0     |1      |5    |100      | 
|[A,B,D] |1.0     |2      |6    |90       | 
|[D]     |0.0.    |3      |7    |80       |
|[C]     |0.0.    |4      |8    |80       |
+--------+--------+-------+-----+----------
| ids    |id_num  |
+--------+--------+
|A       |1       |
|B       |2       |
|C       |3       |
|D       |4       |
+---+----+--------+

What I want:

| ids    |
+--------+
|[1,2,3] |      
|[1,2,4] |    
|[3]     | 
|[4]     | 
+--------+

Is there a way to do this without an explode? Thank you for your help!

Upvotes: 0

Views: 840

Answers (1)

pissall
pissall

Reputation: 7399

You can use a UDF:

from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType

# Suppose this is the dictionary you want to map
map_dict = {'A':1, 'B':2,'C':3,'D':4}

def array_map(array_col):
    return list(map(map_dict.get, array_col))
"""
If you prefer list comprehension, you can return [map_dict[k] for k in array_col]
"""

array_map_udf = udf(array_map, ArrayType())

df = df.withColumn("mapped_array", array_map_udf(col("ids")))

I can't think of a different method, but to get a parallelized dictionary, you can just use the toJSON method. It will require further processing on the kind of reference df you have:

import json
df_json = df.toJSON().map(lambda x: json.loads(x))

Upvotes: 2

Related Questions