Reputation: 45
I have a pyspark dataframe as shown below
+--------------------+---+
| _c0|_c1|
+--------------------+---+
|{"object":"F...| 0|
|{"object":"F...| 1|
|{"object":"F...| 2|
|{"object":"E...| 3|
|{"object":"F...| 4|
|{"object":"F...| 5|
|{"object":"F...| 6|
|{"object":"S...| 7|
|{"object":"F...| 8|
The column _c0
contains a string in dictionary form.
'{"object":"F","time":"2019-07-18T15:08:16.143Z","values":[0.22124142944812775,0.2147877812385559,0.16713131964206696,0.3102800250053406,0.31872493028640747,0.3366488814353943,0.25324496626853943,0.14537988603115082,0.12684473395347595,0.13864757120609283,0.15222792327404022,0.238663449883461,0.22896413505077362,0.237777978181839]}'
How can I convert the above string to a dictionary form and fetch each key value pair and store it to a variables? I don't want to convert it to pandas as it is expensive.
Upvotes: 2
Views: 7536
Reputation: 347
What I did
The column that had JSON string had values like
"{\"arm\": \"1\", \"wtvec\": [1, 1, 0.4], \"td\": \"current|0.01\", \"MABparam\": [[340, 1000], [340, 1000], [340, 1000], [340, 1000], [340, 1000], [340, 1000], [340, 1000], [340, 1000]], \"seg\": \"c2\"}"
made a simple UDF
def htl_id(x):
try:
return int(json.loads(x)['arm'])
except:
raise Exception(x)
htlid_udf = udf(htl_id, IntegerType())
Then for extracting a column named 'arm' in my case,
cdm.withColumn('arm', htlid_udf(col('logString')))
Other answers make you schema and what not, and that wasn't cutting for me
Upvotes: 0
Reputation: 38267
df.rdd.map
applies the given function to each row of data. I have not yet used the python variant of spark, but it could work like this:
import json
def wrangle(row):
tmp = json.loads(row._c0)
return (row._c1, tmp['object'], tmp['time'], tmp['values'])
df.rdd.map(wrangle).toDF() # should yield a new frame/rdd with the object split
The question how to address the columns might work like that, but you seem to have figured that out already.
This loads the JSON-formatted string to a Python object and returns a tuple with the required elements. Maybe you need to return a Row object instead of a tuple, but, as above, I have not yet used the python part of spark.
Upvotes: 0
Reputation: 3224
Extending on @Jacek Laskowski's post:
First create the schema of the struct column. Then use from_json
to convert the string column to a struct. Lastly we use the nested schema structure to extract the new columns (we use the f-strings which need python 3.6). On the struct-type you can directly use .select
to operate on the nested structure.
schema = StructType([StructField("object",StringType()),
StructField("time",StringType()),
StructField("values",ArrayType(FloatType()))])
df=df.withColumn('_c0',f.from_json('_c0', schema))
select_list = ["_c0","_c1"] + [f.col(f'_c0.{column}').alias(column) for column in ["object","time","values"]]
df.select(*select_list).show()
Output (just first to rows)
+--------------------+---+------+--------------------+--------------------+
| _c0|_c1|object| time| values|
+--------------------+---+------+--------------------+--------------------+
|[F, 2019-07-18T15...| 0| F|2019-07-18T15:08:...|[0.22124143, 0.21...|
|[F, 2019-07-18T15...| 1| F|2019-07-18T15:08:...|[0.22124143, 0.21...|
+--------------------+---+------+--------------------+--------------------+
Upvotes: 0
Reputation: 74759
You should use the equivalents of Spark API for Scala's Dataset.withColumn and from_json standard function.
Upvotes: 1