Reputation: 1717
I have a dataframe that looks as below:
items_df
======================================================
| customer item_type brand price quantity |
|====================================================|
| 1 bread reems 20 10 |
| 2 butter spencers 10 21 |
| 3 jam niles 10 22 |
| 1 bread marks 16 18 |
| 1 butter jims 19 12 |
| 1 jam jills 16 6 |
| 2 bread marks 16 18 |
======================================================
I create an rdd that converts the above to a dict:
rdd = items_df.rdd.map(lambda row: row.asDict())
The result looks like:
[
{ "customer": 1, "item_type": "bread", "brand": "reems", "price": 20, "quantity": 10 },
{ "customer": 2, "item_type": "butter", "brand": "spencers", "price": 10, "quantity": 21 },
{ "customer": 3, "item_type": "jam", "brand": "niles", "price": 10, "quantity": 22 },
{ "customer": 1, "item_type": "bread", "brand": "marks", "price": 16, "quantity": 18 },
{ "customer": 1, "item_type": "butter", "brand": "jims", "price": 19, "quantity": 12 },
{ "customer": 1, "item_type": "jam", "brand": "jills", "price": 16, "quantity": 6 },
{ "customer": 2, "item_type": "bread", "brand": "marks", "price": 16, "quantity": 18 }
]
I would like to group the above rows first by customer. Then I would like to introduce custom new keys "breads", "butters", "jams" and group all these rows for that customer. So my rdd reduces from 7 rows to 3 rows.
The output would look as below:
[
{
"customer": 1,
"breads": [
{"item_type": "bread", "brand": "reems", "price": 20, "quantity": 10},
{"item_type": "bread", "brand": "marks", "price": 16, "quantity": 18},
],
"butters": [
{"item_type": "butter", "brand": "jims", "price": 19, "quantity": 12}
],
"jams": [
{"item_type": "jam", "brand": "jills", "price": 16, "quantity": 6}
]
},
{
"customer": 2,
"breads": [
{"item_type": "bread", "brand": "marks", "price": 16, "quantity": 18}
],
"butters": [
{"item_type": "butter", "brand": "spencers", "price": 10, "quantity": 21}
],
"jams": []
},
{
"customer": 3,
"breads": [],
"butters": [],
"jams": [
{"item_type": "jam", "brand": "niles", "price": 10, "quantity": 22}
]
}
]
Would anyone know how to achieve the above using PySpark? I would like to know if there is a solution using reduceByKey() or something similar. I am hoping to avoid the use of groupByKey() if possible.
Upvotes: 1
Views: 1321
Reputation: 1717
I used another approach as well using reduceByKey() in rdd. Given the dataframe items_df, first convert it to rdd:
rdd = items_df.rdd.map(lambda row: row.asDict())
Transform each row to have tuple (customer, [row_obj]) where we have row_obj is in a list:
rdd = rdd.map(lambda row: ( row["customer"], [row] ) )
Group by customer using reduceByKey, where the lists are concatenated for a given customer:
rdd = rdd.reduceByKey(lambda x,y: x+y)
Transform the tuple back to dict where key is customer and value is list of all rows associated:
rdd = rdd.map(lambda tup: { tup[0]: tup[1] } )
Since each customer data is all now in a row, we can segregate the data as breads, butters, jams using a custom function:
def organize_items_in_customer(row):
cust_id = list(row.keys())[0]
items = row[cust_id]
new_cust_obj = { "customer": cust_id, "breads": [], "butters": [], "jams": [] }
plurals = { "bread":"breads", "butter":"butters", "jam":"jams" }
for item in items:
item_type = item["item_type"]
key = plurals[item_type]
new_cust_obj[key].append(item)
return new_cust_obj
Call the above function to transform rdd:
rdd = rdd.map(organize_items_in_customer)
Upvotes: 1
Reputation: 6166
First add a column item_types
to pivot dataframe.
items_df = items_df.withColumn('item_types', F.concat(F.col('item_type'),F.lit('s')))
items_df.show()
+--------+---------+--------+-----+--------+----------+
|customer|item_type| brand|price|quantity|item_types|
+--------+---------+--------+-----+--------+----------+
| 1| bread| reems| 20| 10| breads|
| 2| butter|spencers| 10| 21| butters|
| 3| jam| niles| 10| 22| jams|
| 1| bread| marks| 16| 18| breads|
| 1| butter| jims| 19| 12| butters|
| 1| jam| jills| 16| 6| jams|
| 2| bread| marks| 16| 18| breads|
+--------+---------+--------+-----+--------+----------+
Then you can pivot table with customer
group and use F.collect_list()
to aggregate other columns at the same time.
items_df = items_df.groupby(['customer']).pivot("item_types").agg(
F.collect_list(F.struct(F.col("item_type"),F.col("brand"), F.col("price"),F.col("quantity")))
).sort('customer')
items_df.show()
+--------+--------------------+--------------------+--------------------+
|customer| breads| butters| jams|
+--------+--------------------+--------------------+--------------------+
| 1|[[bread, reems, 2...|[[butter, jims, 1...|[[jam, jills, 16,...|
| 2|[[bread, marks, 1...|[[butter, spencer...| []|
| 3| []| []|[[jam, niles, 10,...|
+--------+--------------------+--------------------+--------------------+
Finally you need set recursive=True
to convert the nested Row into dict.
rdd = items_df.rdd.map(lambda row: row.asDict(recursive=True))
print(rdd.take(10))
[{'customer': 1,
'breads': [{'item_type': u'bread', 'brand': u'reems', 'price': 20, 'quantity': 10},
{'item_type': u'bread', 'brand': u'marks', 'price': 16, 'quantity': 18}],
'butters': [{'item_type': u'butter', 'brand': u'jims', 'price': 19, 'quantity': 12}],
'jams': [{'item_type': u'jam', 'brand': u'jills', 'price': 16, 'quantity': 6}]},
{'customer': 2,
'breads': [{'item_type': u'bread', 'brand': u'marks', 'price': 16, 'quantity': 18}],
'butters': [{'item_type': u'butter', 'brand': u'spencers', 'price': 10, 'quantity': 21}],
'jams': []},
{'customer': 3,
'breads': [],
'butters': [],
'jams': [{'item_type': u'jam', 'brand': u'niles', 'price': 10, 'quantity': 22}]}]
Upvotes: 1