PriyaKod
PriyaKod

Reputation: 113

Merge multiple rows of a dataframe into one record

I need to merge all the rows in a PySpark DataFrame into a list then add an additional attribute to send to an API in batches. Here is the way the json from the df looks like

{"event_type":"click","visitor_platform":"mobile","visitor_country":"CA","mp_os":"Android", "user_properties":{"distinct_id":123, "user_id":345} }{"event_type":"click","visitor_platform":"mobile","visitor_country":"US","mp_os":"Android", "user_properties":{"distinct_id":321, "user_id":543} }

add extra attribute called load and convert to single list of records

{ "load": 123, "events": [ { "event_type":"click","visitor_platform":"mobile","visitor_country":"CA","mp_os":"Android", "user_properties":{"distinct_id":123, "user_id":345} },{"event_type":"click","visitor_platform":"mobile","visitor_country":"US","mp_os":"Android", "user_properties":{"distinct_id":321, "user_id":543} } ] }

Upvotes: 0

Views: 300

Answers (1)

pissall
pissall

Reputation: 7399

You create a new dictionary. And within events, you can called df.toJSON().collect()

>>> df.show()
+-------+------+-------------------+
|user_id|amount|         trans_date|
+-------+------+-------------------+
|    101| 99.10|2019-06-04 00:00:00|
|    102| 89.27|2019-06-04 00:00:00|
|    102| 89.10|2019-03-04 00:00:00|
|    103| 73.11|2019-09-10 00:00:00|
|    101|-69.81|2019-09-11 00:00:00|
|    101| 12.51|2018-12-14 00:00:00|
|    101| 43.23|2018-09-11 00:00:00|
+-------+------+-------------------+
>>> dict1 = {"load": 123, "events": df.toJSON().collect()}
>>> dict1
{'load': 123, 'events': ['{"user_id":"101","amount":"99.10","trans_date":"2019-06-04T00:00:00.000+05:30"}', '{"user_id":"102","amount":"89.27","trans_date":"2019-06-04T00:00:00.000+05:30"}', '{"user_id":"102","amount":"89.10","trans_date":"2019-03-04T00:00:00.000+05:30"}', '{"user_id":"103","amount":"73.11","trans_date":"2019-09-10T00:00:00.000+05:30"}', '{"user_id":"101","amount":"-69.81","trans_date":"2019-09-11T00:00:00.000+05:30"}', '{"user_id":"101","amount":"12.51","trans_date":"2018-12-14T00:00:00.000+05:30"}', '{"user_id":"101","amount":"43.23","trans_date":"2018-09-11T00:00:00.000+05:30"}']}

If you don't like the JSON strings in place of dict objects, you can use json.loads to convert it into a python dict

>>> import json
>>> dict2 = {"load": 123, "events": [json.loads(x) for x in df.toJSON().collect()]}
{'load': 123, 'events': [{'user_id': '101', 'amount': '99.10', 'trans_date': '2019-06-04T00:00:00.000+05:30'}, {'user_id': '102', 'amount': '89.27', 'trans_date': '2019-06-04T00:00:00.000+05:30'}, {'user_id': '102', 'amount': '89.10', 'trans_date': '2019-03-04T00:00:00.000+05:30'}, {'user_id': '103', 'amount': '73.11', 'trans_date': '2019-09-10T00:00:00.000+05:30'}, {'user_id': '101', 'amount': '-69.81', 'trans_date': '2019-09-11T00:00:00.000+05:30'}, {'user_id': '101', 'amount': '12.51', 'trans_date': '2018-12-14T00:00:00.000+05:30'}, {'user_id': '101', 'amount': '43.23', 'trans_date': '2018-09-11T00:00:00.000+05:30'}]}

Upvotes: 1

Related Questions