Reputation: 57
my csv file contains two columns
Sample Csv data looks like in notepad cell B2 Contains json key pair as string.
id,cbgs
sg:bd1f26e681264baaa4b44083891c886a,"{""060372623011"":166,""060372655203"":70,""060377019021"":34}" sg:04c7f777f01c4c75bbd9e43180ce811f,"{""060372073012"":7}"
Now trying to convert as below
id,cbgs,value
sg:bd1f26e681264baaa4b44083891c886a,060372623011,166
sg:bd1f26e681264baaa4b44083891c886a,060372655203,70
sg:bd1f26e681264baaa4b44083891c886a,060377019021,34
sg:04c7f777f01c4c75bbd9e43180ce811f,060372073012,7
what i have tried
1.Attempt1
from pyspark.sql.functions import udf, explode
import json
fifa_df = spark.read.csv("D:\\1. Work\\Safegraph\\Sample Files\\Los Angels\\csv.csv", inferSchema = True, header = True)
fifa_df.printSchema()
df2.select("item",explode(parse("cbgs")).alias("recom_item","recom_cnt")).show()
Error msg:
cannot resolve '
item
' given input columns: [id, cbgs, recom_item, recom_cnt];;
Per DrChess suggestion i have tried below code , but getting empty list as output.
fifa_df.withColumn("cbgs", F.from_json("cbgs", T.MapType(T.StringType(), T.IntegerType()))).select("id", F.explode(["visitor_home_cbgs"]).alias('cbgs', 'value')).show()
+------------------+----+-----+
|safegraph_place_id|cbgs|value|
+------------------+----+-----+
+------------------+----+-----+
Upvotes: 3
Views: 1610
Reputation: 169
Here is what i followed. This involves only string handling operations, not complex data type handling.
escape
option as "
df=spark.read.format('csv').option('header','True').option('escape','"')
|id |cbgs |
+-----------------------------------+--------------------------------------------------------+
|sg:bd1f26e681264baaa4b44083891c886a|{"060372623011":166,"060372655203":70,"060377019021":34}|
|sg:04c7f777f01c4c75bbd9e43180ce811f|{"060372073012":7} |
+-----------------------------------+--------------------------------------------------------+
split
the column
df=df.withColumn('cbgs',split(df['cbgs'],','))
+-----------------------------------+------------------------------------------------------------+
|id |cbgs |
+-----------------------------------+------------------------------------------------------------+
|sg:bd1f26e681264baaa4b44083891c886a|[{"060372623011":166, "060372655203":70, "060377019021":34}]|
|sg:04c7f777f01c4c75bbd9e43180ce811f|[{"060372073012":7}] |
+-----------------------------------+------------------------------------------------------------+
3.Later, Explode.
df=df.withColumn('cbgs',explode(df['cbgs']))
+-----------------------------------+-------------------+
|id |cbgs |
+-----------------------------------+-------------------+
|sg:bd1f26e681264baaa4b44083891c886a|{"060372623011":166|
|sg:bd1f26e681264baaa4b44083891c886a|"060372655203":70 |
|sg:bd1f26e681264baaa4b44083891c886a|"060377019021":34} |
|sg:04c7f777f01c4c75bbd9e43180ce811f|{"060372073012":7} |
+-----------------------------------+-------------------+
regex
--
df=df.select(df['id'],regexp_extract(df['cbgs'],'(\d+)":(\d+)',1).alias('cbgs'),regexp_extract(df['cbgs'],'(\d+)":(\d+)',2).alias('value'))
+-----------------------------------+------------+-----+
|id |cbgs |value|
+-----------------------------------+------------+-----+
|sg:bd1f26e681264baaa4b44083891c886a|060372623011|166 |
|sg:bd1f26e681264baaa4b44083891c886a|060372655203|70 |
|sg:bd1f26e681264baaa4b44083891c886a|060377019021|34 |
|sg:04c7f777f01c4c75bbd9e43180ce811f|060372073012|7 |
+-----------------------------------+------------+-----+
Upvotes: 2
Reputation: 539
You need to first parse the json as a Map<String, Integer>
and then explode the map. You can do it like this:
import pyspark.sql.types as T
import pyspark.sql.functions as F
...
df2.withColumn("cbgs", F.from_json("cbgs", T.MapType(T.StringType(), T.IntegerType()))).select("id", F.explode("cbgs").alias('cbgs', 'value')).show()
Upvotes: 2