Hardik Gupta
Hardik Gupta

Reputation: 4790

Convert pyspark.sql.dataframe.DataFrame type Dataframe to Dictionary

I have a pyspark Dataframe and I need to convert this into python dictionary.

Below code is reproducible:

from pyspark.sql import Row
rdd = sc.parallelize([Row(name='Alice', age=5, height=80),Row(name='Alice', age=5, height=80),Row(name='Alice', age=10, height=80)])
df = rdd.toDF()

Once I have this dataframe, I need to convert it into dictionary.

I tried like this

df.set_index('name').to_dict()

But it gives error. How can I achieve this

Upvotes: 20

Views: 83783

Answers (5)

pasx
pasx

Reputation: 2985

Here is a two-liner for very simple cases. Something more versatile might use a lambda and asDict to produce the value.

Assuming DataFrame df with key, val strings containing column names:

list = df.select(key, val).collect()
dict = {row[key]: row[val] for row in list }

Upvotes: 0

Animesh Srivastava
Animesh Srivastava

Reputation: 89

One easy way can be to collect the row RDDs and iterate over it using dictionary comprehension. Here i will try to demonstrate something similar:

Lets assume a movie dataframe:

movie_df

movieId avg_rating
1 3.92
10 3.5
100 2.79
100044 4.0
100068 3.5
100083 3.5
100106 3.5
100159 4.5
100163 2.9
100194 4.5

We can use dictionary comprehension and iterate over the row RDDs like below:

movie_dict = {int(row.asDict()['movieId']) : row.asDict()['avg_rating'] for row in movie_avg_rating.collect()}
print(movie_dict)
{1: 3.92,
 10: 3.5,
 100: 2.79,
 100044: 4.0,
 100068: 3.5,
 100083: 3.5,
 100106: 3.5,
 100159: 4.5,
 100163: 2.9,
 100194: 4.5}

Upvotes: 4

Adam Ranganathan
Adam Ranganathan

Reputation: 1717

RDDs have built in function asDict() that allows to represent each row as a dict.

If you have a dataframe df, then you need to convert it to an rdd and apply asDict().

new_rdd = df.rdd.map(lambda row: row.asDict(True))

One can then use the new_rdd to perform normal python map operations like:

# You can define normal python functions like below and plug them when needed
def transform(row):
    # Add a new key to each row
    row["new_key"] = "my_new_value"
    return row

new_rdd = new_rdd.map(lambda row: transform(row))

Upvotes: 6

mtoto
mtoto

Reputation: 24198

You need to first convert to a pandas.DataFrame using toPandas(), then you can use the to_dict() method on the transposed dataframe with orient='list':

df.toPandas().set_index('name').T.to_dict('list')
# Out[1]: {u'Alice': [10, 80]}

Upvotes: 26

Fokko Driesprong
Fokko Driesprong

Reputation: 2250

Please see the example below:

>>> from pyspark.sql.functions import col
>>> df = (sc.textFile('data.txt')
            .map(lambda line: line.split(","))
            .toDF(['name','age','height'])
            .select(col('name'), col('age').cast('int'), col('height').cast('int')))

+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|  Bob|  5|    80|
|Alice| 10|    80|
+-----+---+------+

>>> list_persons = map(lambda row: row.asDict(), df.collect())
>>> list_persons
[
    {'age': 5, 'name': u'Alice', 'height': 80}, 
    {'age': 5, 'name': u'Bob', 'height': 80}, 
    {'age': 10, 'name': u'Alice', 'height': 80}
]

>>> dict_persons = {person['name']: person for person in list_persons}
>>> dict_persons
{u'Bob': {'age': 5, 'name': u'Bob', 'height': 80}, u'Alice': {'age': 10, 'name': u'Alice', 'height': 80}}

The input that I'm using to test data.txt:

Alice,5,80
Bob,5,80
Alice,10,80

First we do the loading by using pyspark by reading the lines. Then we convert the lines to columns by splitting on the comma. Then we convert the native RDD to a DF and add names to the colume. Finally we convert to columns to the appropriate format.

Then we collect everything to the driver, and using some python list comprehension we convert the data to the form as preferred. We convert the Row object to a dictionary using the asDict() method. In the output we can observe that Alice is appearing only once, but this is of course because the key of Alice gets overwritten.

Please keep in mind that you want to do all the processing and filtering inside pypspark before returning the result to the driver.

Hope this helps, cheers.

Upvotes: 36

Related Questions