Reputation: 1730
I have "countries" dataframe with map:
+--------------------+
| map|
+--------------------+
|[1-> Spain |
|[2-> Germany ...|
|[3-> Czech Repu...|
|[4-> Malta ...|
How I can access value from map using key, then how it is possible to map values from column in other dataframe using map dataframe.
So from "sale" dataframe like this:
+--------------------+
|country_id | Sale |
+--------------------+
|1 |200 |
|2 |565 |
country_id value will be mapped to country (and we will drop country_id column):
+--------------------+
|country | Sale |
+--------------------+
|Spain |200 |
|Germany |565 |
I know about alternative approach like using joins or dictionary maps but here question is only regarding spark maps. Tried functions like element_at but it haven't worked properly.
Upvotes: 6
Views: 12913
Reputation: 43544
If you are starting with the two Dataframes as shown in your example, the idiomatic way to obtain your desired output is through a join. (I assume your map DataFrame is small relative to the Sale DataFrame, you can probably get away with using a broadcast
join.)
from pyspark.sql.functions import broadcast, col, explode,
from pyspark.sql.types import IntegerType, MapType, StringType
from pyspark.sql.types import StructType, StructField
# set up data
map_df = spark.createDataFrame(
[({1: "Spain"},),({2: "Germany"},),({3: "Czech Republic"},),({4: "Malta"},)],
schema=StructType([StructField("map", MapType(IntegerType(), StringType()))])
)
sale_df = spark.createDataFrame([(1, 200), (2, 565)], ["country_id","Sale"])
# join
sale_df.join(
broadcast(map_df.select(explode("map").alias("country_id", "country"))),
on="country_id",
how="left"
).select("country", "Sale").show()
#+-------+----+
#|country|Sale|
#+-------+----+
#| Spain| 200|
#|Germany| 565|
#+-------+----+
If instead, you had your mapping as a single MapType
, you could avoid the join by pushing the evaluation of the map up in execution plan.
from pyspark.sql.functions import array, map_from_arrays, lit
my_dict = {1: "Spain", 2: "Germany", 3: "Czech Republic", 4: "Malta"}
my_map = map_from_arrays(
array(*map(lit, my_dict.keys())),
array(*map(lit, my_dict.values()))
)
print(my_map)
#Column<map_from_arrays(array(1, 2, 3, 4), array(Spain, Germany, Czech Republic, Malta))>
Now use getItem
in your select statement:
sale_df.select(my_map.getItem(col("country_id")).alias("country"), "Sale").show()
#+-------+----+
#|country|Sale|
#+-------+----+
#| Spain| 200|
#|Germany| 565|
#+-------+----+
And the execution plan:
sale_df.select(my_map.getItem(col("country_id")).alias("country"), "Sale").explain()
#== Physical Plan ==
#*(1) Project [keys: [1,2,3,4], values: [Spain,Germany,Czech Republic,Malta][cast(country_id#6L as int)] AS country#62, Sale#7L]
#+- Scan ExistingRDD[country_id#6L,Sale#7L]
Could you transform the data you have in the first method (DataFrame) to the second method? Yes - but it's almost surely not worth the overhead to do so.
Upvotes: 8