user14681827
user14681827

Reputation: 153

Split column with JSON string to columns each containing one key-value pair from the string

I have a data frame that looks like this (one column named "value" with a JSON string in it). I send it to an Event Hub using Kafka API and then I want to read that data from the Event Hub and apply some transformations to it. The data is in received in binary format, as described in the Kafka documentation.

enter image description here

Here are a few columns in CSV format:

value
"{""id"":""e52f247c-f46c-4021-bc62-e28e56db1ad8"",""latitude"":""34.5016064725731"",""longitude"":""123.43996453687777""}"
"{""id"":""32782100-9b59-49c7-9d56-bb4dfc368a86"",""latitude"":""49.938541626415144"",""longitude"":""111.88360885971986""}"
"{""id"":""a72a600f-2b99-4c41-a388-9a24c00545c0"",""latitude"":""4.988768300413497"",""longitude"":""-141.92727675177588""}"
"{""id"":""5a5f056a-cdfd-4957-8e84-4d5271253509"",""latitude"":""41.802942545247134"",""longitude"":""90.45164573613573""}"
"{""id"":""d00d0926-46eb-45dd-9e35-ab765804340d"",""latitude"":""70.60161063520081"",""longitude"":""20.566520665122482""}"
"{""id"":""dda14397-6922-4bb6-9be3-a1546f08169d"",""latitude"":""68.400462882435"",""longitude"":""135.7167027587489""}"
"{""id"":""c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea"",""latitude"":""26.04757722355835"",""longitude"":""175.20227554031783""}"
"{""id"":""97f8f1cf-3aa0-49bb-b3d5-05b736e0c883"",""latitude"":""35.52624182094499"",""longitude"":""-164.18066699972852""}"
"{""id"":""6bed49bc-ee93-4ed9-893f-4f51c7b7f703"",""latitude"":""-24.319581484353847"",""longitude"":""85.27338980948076""}"

What I want to do is to apply a transformation and create a data frame with 3 columns one with id, one with latitude and one with longitude. This is what I tried but the result is not what I expected:

from pyspark.sql.types import StructType
from pyspark.sql.functions import from_json
from pyspark.sql import functions as F

# df is the data frame received from Kafka
location_schema = StructType().add("id", "string").add("latitude", "float").add("longitude", "float")
string_df = df.selectExpr("CAST(value AS STRING)").withColumn("value", from_json(F.col("value"), location_schema))
string_df.printSchema()
string_df.show()

And this is the result:

enter image description here

It created a "value" column with a structure as a value. Any idea what to do to obtain 3 different columns, as I described?

Upvotes: 2

Views: 2074

Answers (5)

Sachin Tiwari
Sachin Tiwari

Reputation: 342

If pattern remains unchanged then you can use regexp_replace()

>>> df = spark.read.option("header",False).option("inferSchema",True).csv("/dir1/dir2/Sample2.csv")
>>> df.show(truncate=False)
+-------------------------------------------------+------------------------------------+---------------------------------------+
|_c0                                              |_c1                                 |_c2                                    |
+-------------------------------------------------+------------------------------------+---------------------------------------+
|"{""id"":""e52f247c-f46c-4021-bc62-e28e56db1ad8""|""latitude"":""34.5016064725731""   |""longitude"":""123.43996453687777""}" |
|"{""id"":""32782100-9b59-49c7-9d56-bb4dfc368a86""|""latitude"":""49.938541626415144"" |""longitude"":""111.88360885971986""}" |
|"{""id"":""a72a600f-2b99-4c41-a388-9a24c00545c0""|""latitude"":""4.988768300413497""  |""longitude"":""-141.92727675177588""}"|
|"{""id"":""5a5f056a-cdfd-4957-8e84-4d5271253509""|""latitude"":""41.802942545247134"" |""longitude"":""90.45164573613573""}"  |
|"{""id"":""d00d0926-46eb-45dd-9e35-ab765804340d""|""latitude"":""70.60161063520081""  |""longitude"":""20.566520665122482""}" |
|"{""id"":""dda14397-6922-4bb6-9be3-a1546f08169d""|""latitude"":""68.400462882435""    |""longitude"":""135.7167027587489""}"  |
|"{""id"":""c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea""|""latitude"":""26.04757722355835""  |""longitude"":""175.20227554031783""}" |
|"{""id"":""97f8f1cf-3aa0-49bb-b3d5-05b736e0c883""|""latitude"":""35.52624182094499""  |""longitude"":""-164.18066699972852""}"|
|"{""id"":""6bed49bc-ee93-4ed9-893f-4f51c7b7f703""|""latitude"":""-24.319581484353847""|""longitude"":""85.27338980948076""}"  |
+-------------------------------------------------+------------------------------------+---------------------------------------+

>>> df.withColumn("id",regexp_replace('_c0','\"\{\"\"id\"\":\"\"','')).withColumn("id",regexp_replace('id','\"\"','')).withColumn("latitude",regexp_replace('_c1','\"\"latitude\"\":\"\"','')).withColumn("latitude",regexp_replace('latitude','\"\"','')).withColumn("longitude",regexp_replace('_c2','\"\"longitude\"\":\"\"','')).withColumn("longitude",regexp_replace('longitude','\"\"\}\"','')).drop("_c0").drop("_c1").drop("_c2").show()
+--------------------+-------------------+-------------------+
|                  id|           latitude|          longitude|
+--------------------+-------------------+-------------------+
|e52f247c-f46c-402...|   34.5016064725731| 123.43996453687777|
|32782100-9b59-49c...| 49.938541626415144| 111.88360885971986|
|a72a600f-2b99-4c4...|  4.988768300413497|-141.92727675177588|
|5a5f056a-cdfd-495...| 41.802942545247134|  90.45164573613573|
|d00d0926-46eb-45d...|  70.60161063520081| 20.566520665122482|
|dda14397-6922-4bb...|    68.400462882435|  135.7167027587489|
|c7f13b8a-3468-4bc...|  26.04757722355835| 175.20227554031783|
|97f8f1cf-3aa0-49b...|  35.52624182094499|-164.18066699972852|
|6bed49bc-ee93-4ed...|-24.319581484353847|  85.27338980948076|
+--------------------+-------------------+-------------------+

Upvotes: 1

Saideep Arikontham
Saideep Arikontham

Reputation: 6114

I used the sample data provided, created a dataframe called df and proceeded to use the same method as you.

  • The following is the image of the rows present inside df dataframe.

enter image description here

  • The fields are not displayed as required because of the their datatype. The values for latitude and longitude are present as string types in the dataframe df. But while creating the schema location_schema you have specified their type as float. Instead, try changing their type to string and later convert them to double type. The code looks as shown below:
location_schema = StructType().add("id", "string").add("latitude", "string").add("longitude", "string")
string_df = df.selectExpr('CAST(value AS STRING)').withColumn("value", from_json(F.col("value"), location_schema))
string_df.printSchema()
string_df.show(truncate=False)

enter image description here

  • Now using DataFrame.withColumn(), Column.withField() and cast() convert the string type fields latitude and longitude to Double Type.
string_df = string_df.withColumn("value", col("value").withField("latitude", col("value.latitude").cast(DoubleType())))\
            .withColumn("value", col("value").withField("longitude", col("value.longitude").cast(DoubleType())))

string_df.printSchema()
string_df.show(truncate=False)
  • So, you can get the desired output as shown below.

enter image description here

Update:

  • To get separate columns you can simply use json_tuple() method. Refer to this official spark documentation:

pyspark.sql.functions.json_tuple — PySpark 3.3.0 documentation (apache.org)

Upvotes: 0

ZygD
ZygD

Reputation: 24478

It's easy to extract JSON string as columns using inline and from_json

df = spark.createDataFrame(
    [('{"id":"e52f247c-f46c-4021-bc62-e28e56db1ad8","latitude":"34.5016064725731","longitude":"123.43996453687777"}',)],
    ['value'])

df = df.selectExpr(
    "inline(array(from_json(value, 'struct<id:string, latitude:string, longitude:string>')))"
)

df.show(truncate=0)
# +------------------------------------+----------------+------------------+
# |id                                  |latitude        |longitude         |
# +------------------------------------+----------------+------------------+
# |e52f247c-f46c-4021-bc62-e28e56db1ad8|34.5016064725731|123.43996453687777|
# +------------------------------------+----------------+------------------+

Upvotes: 0

ZygD
ZygD

Reputation: 24478

You can use json_tuple to extract values from JSON string.

Input:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('{"id":"e52f247c-f46c-4021-bc62-e28e56db1ad8","latitude":"34.5016064725731","longitude":"123.43996453687777"}',)],
    ['value'])

Script:

cols = ['id', 'latitude', 'longitude']
df = df.select(F.json_tuple('value', *cols)).toDF(*cols)
df.show(truncate=0)
# +------------------------------------+----------------+------------------+
# |id                                  |latitude        |longitude         |
# +------------------------------------+----------------+------------------+
# |e52f247c-f46c-4021-bc62-e28e56db1ad8|34.5016064725731|123.43996453687777|
# +------------------------------------+----------------+------------------+

If needed, cast to double:

.withColumn('latitude', F.col('latitude').cast('double'))
.withColumn('longitude', F.col('longitude').cast('double'))

Upvotes: 0

Luiz Viola
Luiz Viola

Reputation: 2436

Your df:

df = spark.createDataFrame(
    [
(1, '{"id":"e52f247c-f46c-4021-bc62-e28e56db1ad8","latitude":"34.5016064725731","longitude":"123.43996453687777"}'),
(2, '{"id":"32782100-9b59-49c7-9d56-bb4dfc368a86","latitude":"49.938541626415144","longitude":"111.88360885971986"}'),
(3, '{"id":"a72a600f-2b99-4c41-a388-9a24c00545c0","latitude":"4.988768300413497","longitude":"-141.92727675177588"}'),
(4, '{"id":"5a5f056a-cdfd-4957-8e84-4d5271253509","latitude":"41.802942545247134","longitude":"90.45164573613573"}'),
(5, '{"id":"d00d0926-46eb-45dd-9e35-ab765804340d","latitude":"70.60161063520081","longitude":"20.566520665122482"}'),
(6, '{"id":"dda14397-6922-4bb6-9be3-a1546f08169d","latitude":"68.400462882435","longitude":"135.7167027587489"}'),
(7, '{"id":"c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea","latitude":"26.04757722355835","longitude":"175.20227554031783"}'),
(8, '{"id":"97f8f1cf-3aa0-49bb-b3d5-05b736e0c883","latitude":"35.52624182094499","longitude":"-164.18066699972852"}'),
(9, '{"id":"6bed49bc-ee93-4ed9-893f-4f51c7b7f703","latitude":"-24.319581484353847","longitude":"85.27338980948076"}')
    ],
    ['id', 'value']
).drop('id')

+--------------------------------------------------------------------------------------------------------------+
|value                                                                                                         |
+--------------------------------------------------------------------------------------------------------------+
|{"id":"e52f247c-f46c-4021-bc62-e28e56db1ad8","latitude":"34.5016064725731","longitude":"123.43996453687777"}  |
|{"id":"32782100-9b59-49c7-9d56-bb4dfc368a86","latitude":"49.938541626415144","longitude":"111.88360885971986"}|
|{"id":"a72a600f-2b99-4c41-a388-9a24c00545c0","latitude":"4.988768300413497","longitude":"-141.92727675177588"}|
|{"id":"5a5f056a-cdfd-4957-8e84-4d5271253509","latitude":"41.802942545247134","longitude":"90.45164573613573"} |
|{"id":"d00d0926-46eb-45dd-9e35-ab765804340d","latitude":"70.60161063520081","longitude":"20.566520665122482"} |
|{"id":"dda14397-6922-4bb6-9be3-a1546f08169d","latitude":"68.400462882435","longitude":"135.7167027587489"}    |
|{"id":"c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea","latitude":"26.04757722355835","longitude":"175.20227554031783"} |
|{"id":"97f8f1cf-3aa0-49bb-b3d5-05b736e0c883","latitude":"35.52624182094499","longitude":"-164.18066699972852"}|
|{"id":"6bed49bc-ee93-4ed9-893f-4f51c7b7f703","latitude":"-24.319581484353847","longitude":"85.27338980948076"}|
+--------------------------------------------------------------------------------------------------------------+

Then:

from pyspark.sql import functions as F
from pyspark.sql.types import *

json_schema = StructType([
                          StructField("id", StringType(), True),
                          StructField("latitude", FloatType(), True),
                          StructField("longitude", FloatType(), True)
                         ])

df\
    .withColumn('json', F.from_json(F.col('value'), json_schema))\
    .select(F.col('json').getItem('id').alias('id'),
            F.col('json').getItem('latitude').alias('latitude'),
            F.col('json').getItem('longitude').alias('longitude')
           )\
    .show(truncate=False)

+------------------------------------+-------------------+-------------------+
|id                                  |latitude           |longitude          |
+------------------------------------+-------------------+-------------------+
|e52f247c-f46c-4021-bc62-e28e56db1ad8|34.5016064725731   |123.43996453687777 |
|32782100-9b59-49c7-9d56-bb4dfc368a86|49.938541626415144 |111.88360885971986 |
|a72a600f-2b99-4c41-a388-9a24c00545c0|4.988768300413497  |-141.92727675177588|
|5a5f056a-cdfd-4957-8e84-4d5271253509|41.802942545247134 |90.45164573613573  |
|d00d0926-46eb-45dd-9e35-ab765804340d|70.60161063520081  |20.566520665122482 |
|dda14397-6922-4bb6-9be3-a1546f08169d|68.400462882435    |135.7167027587489  |
|c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea|26.04757722355835  |175.20227554031783 |
|97f8f1cf-3aa0-49bb-b3d5-05b736e0c883|35.52624182094499  |-164.18066699972852|
|6bed49bc-ee93-4ed9-893f-4f51c7b7f703|-24.319581484353847|85.27338980948076  |
+------------------------------------+-------------------+-------------------+

Upvotes: 1

Related Questions