user3114645
user3114645

Reputation: 57

convert csv dict column into rows pyspark

my csv file contains two columns

  1. Id
  2. cbgs (dictionary key pair values enclosed by "")

Sample Csv data looks like in notepad cell B2 Contains json key pair as string.

id,cbgs sg:bd1f26e681264baaa4b44083891c886a,"{""060372623011"":166,""060372655203"":70,""060377019021"":34}" sg:04c7f777f01c4c75bbd9e43180ce811f,"{""060372073012"":7}"

Now trying to convert as below

id,cbgs,value
sg:bd1f26e681264baaa4b44083891c886a,060372623011,166
sg:bd1f26e681264baaa4b44083891c886a,060372655203,70
sg:bd1f26e681264baaa4b44083891c886a,060377019021,34
sg:04c7f777f01c4c75bbd9e43180ce811f,060372073012,7

what i have tried

1.Attempt1

from pyspark.sql.functions import udf, explode
import json        
fifa_df = spark.read.csv("D:\\1. Work\\Safegraph\\Sample Files\\Los Angels\\csv.csv", inferSchema = True, header = True)
fifa_df.printSchema() 
   df2.select("item",explode(parse("cbgs")).alias("recom_item","recom_cnt")).show()

Error msg:

cannot resolve 'item' given input columns: [id, cbgs, recom_item, recom_cnt];;

Per DrChess suggestion i have tried below code , but getting empty list as output.

fifa_df.withColumn("cbgs", F.from_json("cbgs", T.MapType(T.StringType(), T.IntegerType()))).select("id", F.explode(["visitor_home_cbgs"]).alias('cbgs', 'value')).show()






+------------------+----+-----+
|safegraph_place_id|cbgs|value|
+------------------+----+-----+
+------------------+----+-----+

Upvotes: 3

Views: 1610

Answers (2)

j raj
j raj

Reputation: 169

Here is what i followed. This involves only string handling operations, not complex data type handling.

  1. Read the source csv file with escape option as " df=spark.read.format('csv').option('header','True').option('escape','"')

|id                                 |cbgs                                                    |
+-----------------------------------+--------------------------------------------------------+
|sg:bd1f26e681264baaa4b44083891c886a|{"060372623011":166,"060372655203":70,"060377019021":34}|
|sg:04c7f777f01c4c75bbd9e43180ce811f|{"060372073012":7}                                      |
+-----------------------------------+--------------------------------------------------------+
  1. Second column is loaded as string rather than map. Now split the column df=df.withColumn('cbgs',split(df['cbgs'],','))
+-----------------------------------+------------------------------------------------------------+
|id                                 |cbgs                                                        |
+-----------------------------------+------------------------------------------------------------+
|sg:bd1f26e681264baaa4b44083891c886a|[{"060372623011":166, "060372655203":70, "060377019021":34}]|
|sg:04c7f777f01c4c75bbd9e43180ce811f|[{"060372073012":7}]                                        |
+-----------------------------------+------------------------------------------------------------+

3.Later, Explode.

df=df.withColumn('cbgs',explode(df['cbgs']))

+-----------------------------------+-------------------+
|id                                 |cbgs               |
+-----------------------------------+-------------------+
|sg:bd1f26e681264baaa4b44083891c886a|{"060372623011":166|
|sg:bd1f26e681264baaa4b44083891c886a|"060372655203":70  |
|sg:bd1f26e681264baaa4b44083891c886a|"060377019021":34} |
|sg:04c7f777f01c4c75bbd9e43180ce811f|{"060372073012":7} |
+-----------------------------------+-------------------+
  1. Extract the values from cbgs column using regex -- df=df.select(df['id'],regexp_extract(df['cbgs'],'(\d+)":(\d+)',1).alias('cbgs'),regexp_extract(df['cbgs'],'(\d+)":(\d+)',2).alias('value'))
+-----------------------------------+------------+-----+
|id                                 |cbgs        |value|
+-----------------------------------+------------+-----+
|sg:bd1f26e681264baaa4b44083891c886a|060372623011|166  |
|sg:bd1f26e681264baaa4b44083891c886a|060372655203|70   |
|sg:bd1f26e681264baaa4b44083891c886a|060377019021|34   |
|sg:04c7f777f01c4c75bbd9e43180ce811f|060372073012|7    |
+-----------------------------------+------------+-----+
  1. Write to csv.

Upvotes: 2

Seb
Seb

Reputation: 539

You need to first parse the json as a Map<String, Integer> and then explode the map. You can do it like this:

import pyspark.sql.types as T
import pyspark.sql.functions as F

...

df2.withColumn("cbgs", F.from_json("cbgs", T.MapType(T.StringType(), T.IntegerType()))).select("id", F.explode("cbgs").alias('cbgs', 'value')).show()

Upvotes: 2

Related Questions