Reputation: 18547
This is how the table look like
which I extract using the following command:
query="""
select
distinct
userid,
region,
json_data
from mytable
where
operation = 'myvalue'
"""
table=spark.sql(query)
Now, I wish to extract only value of msg_id
in column json_data
(which is a string column), with the following expected output:
How should I change the query
in the above code to extract the json_data
Note:
The json format is not fix (i.e., may contains other fields), but the value I want to extract is always with msg_id
.
I want to achieve during retrieval for efficiency reason, though I can retrieve the json_data
and format it afterwards.
Upvotes: 3
Views: 8100
Reputation: 8410
Instead of reading file to get schema you can specify schema using StructType and StructField syntax
, or <> syntax
or use schema_of_json
as shown below:
df.show() #sampledataframe
#+------+------+-----------------------------------------+
#|userid|region|json_data |
#+------+------+-----------------------------------------+
#|1 |US |{"msg_id":123} |
#|2 |US |{"msg_id":123} |
#|3 |US |{"msg_id":123} |
#|4 |US |{"msg_id":123,"is_ads":true,"location":2}|
#|5 |US |{"msg_id":456} |
#+------+------+-----------------------------------------+
from pyspark.sql import functions as F
from pyspark.sql.types import *
schema = StructType([StructField("msg_id", LongType(), True),
StructField("is_ads", BooleanType(), True),
StructField("location", LongType(), True)])
#OR
schema= 'struct<is_ads:boolean,location:bigint,msg_id:bigint>'
#OR
schema= df.select(F.schema_of_json("""{"msg_id":123,"is_ads":true,"location":2}""")).collect()[0][0]
df.withColumn("json_data", F.from_json("json_data",schema))\
.select("userid","region","json_data.msg_id").show()
#+------+------+------+
#|userid|region|msg_id|
#+------+------+------+
#| 1| US| 123|
#| 2| US| 123|
#| 3| US| 123|
#| 4| US| 123|
#| 5| US| 456|
#+------+------+------+
Upvotes: 3
Reputation: 780
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType,StructField,StringType
spark = SparkSession.builder.getOrCreate()
schema = StructType([
StructField("a", StringType(), True),
StructField("b", StringType(), True),
StructField("json", StringType(), True)
])
data = [("a","b",'{"msg_id":"123","msg":"test"}'),("c","d",'{"msg_id":"456","column1":"test"}')]
df = spark.createDataFrame(data,schema)
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df2 = df.withColumn('parsed', from_json(col('json'), json_schema))
df2.createOrReplaceTempView("test")
spark.sql("select a,b,parsed.msg_id from test").show()```
OUTPUT >>>
+---+---+------+
| a| b|msg_id|
+---+---+------+
| a| b| 123|
| c| d| 456|
+---+---+------+
Upvotes: 3