william007
william007

Reputation: 18547

How to extract value of json when doing pyspark query

This is how the table look like

enter image description here

which I extract using the following command:

query="""
select
        distinct
        userid,
        region,
        json_data
    from mytable
    where
        operation = 'myvalue'
"""
table=spark.sql(query)

Now, I wish to extract only value of msg_id in column json_data (which is a string column), with the following expected output:

enter image description here

How should I change the query in the above code to extract the json_data

Note:

  1. The json format is not fix (i.e., may contains other fields), but the value I want to extract is always with msg_id.

  2. I want to achieve during retrieval for efficiency reason, though I can retrieve the json_data and format it afterwards.

Upvotes: 3

Views: 8100

Answers (2)

murtihash
murtihash

Reputation: 8410

Instead of reading file to get schema you can specify schema using StructType and StructField syntax, or <> syntax or use schema_of_json as shown below:

df.show() #sampledataframe

#+------+------+-----------------------------------------+
#|userid|region|json_data                                |
#+------+------+-----------------------------------------+
#|1     |US    |{"msg_id":123}                           |
#|2     |US    |{"msg_id":123}                           |
#|3     |US    |{"msg_id":123}                           |
#|4     |US    |{"msg_id":123,"is_ads":true,"location":2}|
#|5     |US    |{"msg_id":456}                           |
#+------+------+-----------------------------------------+

from pyspark.sql import functions as F
from pyspark.sql.types import *

schema = StructType([StructField("msg_id", LongType(), True),
                          StructField("is_ads", BooleanType(), True),
                         StructField("location", LongType(), True)])

#OR

schema= 'struct<is_ads:boolean,location:bigint,msg_id:bigint>'

#OR

schema= df.select(F.schema_of_json("""{"msg_id":123,"is_ads":true,"location":2}""")).collect()[0][0]

df.withColumn("json_data", F.from_json("json_data",schema))\
  .select("userid","region","json_data.msg_id").show()

#+------+------+------+
#|userid|region|msg_id|
#+------+------+------+
#|     1|    US|   123|
#|     2|    US|   123|
#|     3|    US|   123|
#|     4|    US|   123|
#|     5|    US|   456|
#+------+------+------+

Upvotes: 3

srikanth holur
srikanth holur

Reputation: 780

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType,StructField,StringType
spark = SparkSession.builder.getOrCreate()

schema = StructType([
  StructField("a", StringType(), True),
  StructField("b", StringType(),  True),
  StructField("json", StringType(), True)
])

data = [("a","b",'{"msg_id":"123","msg":"test"}'),("c","d",'{"msg_id":"456","column1":"test"}')]

df = spark.createDataFrame(data,schema)
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df2 = df.withColumn('parsed', from_json(col('json'), json_schema))
df2.createOrReplaceTempView("test")
spark.sql("select a,b,parsed.msg_id from test").show()```

OUTPUT >>> 
+---+---+------+
|  a|  b|msg_id|
+---+---+------+
|  a|  b|   123|
|  c|  d|   456|
+---+---+------+

Upvotes: 3

Related Questions