Reputation: 153
I have a data frame that looks like this (one column named "value" with a JSON string in it). I send it to an Event Hub using Kafka API and then I want to read that data from the Event Hub and apply some transformations to it. The data is in received in binary format, as described in the Kafka documentation.
Here are a few columns in CSV format:
value
"{""id"":""e52f247c-f46c-4021-bc62-e28e56db1ad8"",""latitude"":""34.5016064725731"",""longitude"":""123.43996453687777""}"
"{""id"":""32782100-9b59-49c7-9d56-bb4dfc368a86"",""latitude"":""49.938541626415144"",""longitude"":""111.88360885971986""}"
"{""id"":""a72a600f-2b99-4c41-a388-9a24c00545c0"",""latitude"":""4.988768300413497"",""longitude"":""-141.92727675177588""}"
"{""id"":""5a5f056a-cdfd-4957-8e84-4d5271253509"",""latitude"":""41.802942545247134"",""longitude"":""90.45164573613573""}"
"{""id"":""d00d0926-46eb-45dd-9e35-ab765804340d"",""latitude"":""70.60161063520081"",""longitude"":""20.566520665122482""}"
"{""id"":""dda14397-6922-4bb6-9be3-a1546f08169d"",""latitude"":""68.400462882435"",""longitude"":""135.7167027587489""}"
"{""id"":""c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea"",""latitude"":""26.04757722355835"",""longitude"":""175.20227554031783""}"
"{""id"":""97f8f1cf-3aa0-49bb-b3d5-05b736e0c883"",""latitude"":""35.52624182094499"",""longitude"":""-164.18066699972852""}"
"{""id"":""6bed49bc-ee93-4ed9-893f-4f51c7b7f703"",""latitude"":""-24.319581484353847"",""longitude"":""85.27338980948076""}"
What I want to do is to apply a transformation and create a data frame with 3 columns one with id, one with latitude and one with longitude. This is what I tried but the result is not what I expected:
from pyspark.sql.types import StructType
from pyspark.sql.functions import from_json
from pyspark.sql import functions as F
# df is the data frame received from Kafka
location_schema = StructType().add("id", "string").add("latitude", "float").add("longitude", "float")
string_df = df.selectExpr("CAST(value AS STRING)").withColumn("value", from_json(F.col("value"), location_schema))
string_df.printSchema()
string_df.show()
And this is the result:
It created a "value" column with a structure as a value. Any idea what to do to obtain 3 different columns, as I described?
Upvotes: 2
Views: 2074
Reputation: 342
If pattern remains unchanged then you can use regexp_replace()
>>> df = spark.read.option("header",False).option("inferSchema",True).csv("/dir1/dir2/Sample2.csv")
>>> df.show(truncate=False)
+-------------------------------------------------+------------------------------------+---------------------------------------+
|_c0 |_c1 |_c2 |
+-------------------------------------------------+------------------------------------+---------------------------------------+
|"{""id"":""e52f247c-f46c-4021-bc62-e28e56db1ad8""|""latitude"":""34.5016064725731"" |""longitude"":""123.43996453687777""}" |
|"{""id"":""32782100-9b59-49c7-9d56-bb4dfc368a86""|""latitude"":""49.938541626415144"" |""longitude"":""111.88360885971986""}" |
|"{""id"":""a72a600f-2b99-4c41-a388-9a24c00545c0""|""latitude"":""4.988768300413497"" |""longitude"":""-141.92727675177588""}"|
|"{""id"":""5a5f056a-cdfd-4957-8e84-4d5271253509""|""latitude"":""41.802942545247134"" |""longitude"":""90.45164573613573""}" |
|"{""id"":""d00d0926-46eb-45dd-9e35-ab765804340d""|""latitude"":""70.60161063520081"" |""longitude"":""20.566520665122482""}" |
|"{""id"":""dda14397-6922-4bb6-9be3-a1546f08169d""|""latitude"":""68.400462882435"" |""longitude"":""135.7167027587489""}" |
|"{""id"":""c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea""|""latitude"":""26.04757722355835"" |""longitude"":""175.20227554031783""}" |
|"{""id"":""97f8f1cf-3aa0-49bb-b3d5-05b736e0c883""|""latitude"":""35.52624182094499"" |""longitude"":""-164.18066699972852""}"|
|"{""id"":""6bed49bc-ee93-4ed9-893f-4f51c7b7f703""|""latitude"":""-24.319581484353847""|""longitude"":""85.27338980948076""}" |
+-------------------------------------------------+------------------------------------+---------------------------------------+
>>> df.withColumn("id",regexp_replace('_c0','\"\{\"\"id\"\":\"\"','')).withColumn("id",regexp_replace('id','\"\"','')).withColumn("latitude",regexp_replace('_c1','\"\"latitude\"\":\"\"','')).withColumn("latitude",regexp_replace('latitude','\"\"','')).withColumn("longitude",regexp_replace('_c2','\"\"longitude\"\":\"\"','')).withColumn("longitude",regexp_replace('longitude','\"\"\}\"','')).drop("_c0").drop("_c1").drop("_c2").show()
+--------------------+-------------------+-------------------+
| id| latitude| longitude|
+--------------------+-------------------+-------------------+
|e52f247c-f46c-402...| 34.5016064725731| 123.43996453687777|
|32782100-9b59-49c...| 49.938541626415144| 111.88360885971986|
|a72a600f-2b99-4c4...| 4.988768300413497|-141.92727675177588|
|5a5f056a-cdfd-495...| 41.802942545247134| 90.45164573613573|
|d00d0926-46eb-45d...| 70.60161063520081| 20.566520665122482|
|dda14397-6922-4bb...| 68.400462882435| 135.7167027587489|
|c7f13b8a-3468-4bc...| 26.04757722355835| 175.20227554031783|
|97f8f1cf-3aa0-49b...| 35.52624182094499|-164.18066699972852|
|6bed49bc-ee93-4ed...|-24.319581484353847| 85.27338980948076|
+--------------------+-------------------+-------------------+
Upvotes: 1
Reputation: 6114
I used the sample data provided, created a dataframe called df
and proceeded to use the same method as you.
df
dataframe.latitude
and longitude
are present as string
types in the dataframe df
. But while creating the schema location_schema
you have specified their type as float
. Instead, try changing their type to string
and later convert them to double type
. The code looks as shown below:location_schema = StructType().add("id", "string").add("latitude", "string").add("longitude", "string")
string_df = df.selectExpr('CAST(value AS STRING)').withColumn("value", from_json(F.col("value"), location_schema))
string_df.printSchema()
string_df.show(truncate=False)
DataFrame.withColumn(), Column.withField() and cast()
convert the string type fields latitude and longitude to Double Type
.string_df = string_df.withColumn("value", col("value").withField("latitude", col("value.latitude").cast(DoubleType())))\
.withColumn("value", col("value").withField("longitude", col("value.longitude").cast(DoubleType())))
string_df.printSchema()
string_df.show(truncate=False)
Update:
json_tuple()
method. Refer to this official spark documentation:pyspark.sql.functions.json_tuple — PySpark 3.3.0 documentation (apache.org)
Upvotes: 0
Reputation: 24478
It's easy to extract JSON string as columns using inline
and from_json
df = spark.createDataFrame(
[('{"id":"e52f247c-f46c-4021-bc62-e28e56db1ad8","latitude":"34.5016064725731","longitude":"123.43996453687777"}',)],
['value'])
df = df.selectExpr(
"inline(array(from_json(value, 'struct<id:string, latitude:string, longitude:string>')))"
)
df.show(truncate=0)
# +------------------------------------+----------------+------------------+
# |id |latitude |longitude |
# +------------------------------------+----------------+------------------+
# |e52f247c-f46c-4021-bc62-e28e56db1ad8|34.5016064725731|123.43996453687777|
# +------------------------------------+----------------+------------------+
Upvotes: 0
Reputation: 24478
You can use json_tuple
to extract values from JSON string.
Input:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('{"id":"e52f247c-f46c-4021-bc62-e28e56db1ad8","latitude":"34.5016064725731","longitude":"123.43996453687777"}',)],
['value'])
Script:
cols = ['id', 'latitude', 'longitude']
df = df.select(F.json_tuple('value', *cols)).toDF(*cols)
df.show(truncate=0)
# +------------------------------------+----------------+------------------+
# |id |latitude |longitude |
# +------------------------------------+----------------+------------------+
# |e52f247c-f46c-4021-bc62-e28e56db1ad8|34.5016064725731|123.43996453687777|
# +------------------------------------+----------------+------------------+
If needed, cast
to double:
.withColumn('latitude', F.col('latitude').cast('double'))
.withColumn('longitude', F.col('longitude').cast('double'))
Upvotes: 0
Reputation: 2436
Your df:
df = spark.createDataFrame(
[
(1, '{"id":"e52f247c-f46c-4021-bc62-e28e56db1ad8","latitude":"34.5016064725731","longitude":"123.43996453687777"}'),
(2, '{"id":"32782100-9b59-49c7-9d56-bb4dfc368a86","latitude":"49.938541626415144","longitude":"111.88360885971986"}'),
(3, '{"id":"a72a600f-2b99-4c41-a388-9a24c00545c0","latitude":"4.988768300413497","longitude":"-141.92727675177588"}'),
(4, '{"id":"5a5f056a-cdfd-4957-8e84-4d5271253509","latitude":"41.802942545247134","longitude":"90.45164573613573"}'),
(5, '{"id":"d00d0926-46eb-45dd-9e35-ab765804340d","latitude":"70.60161063520081","longitude":"20.566520665122482"}'),
(6, '{"id":"dda14397-6922-4bb6-9be3-a1546f08169d","latitude":"68.400462882435","longitude":"135.7167027587489"}'),
(7, '{"id":"c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea","latitude":"26.04757722355835","longitude":"175.20227554031783"}'),
(8, '{"id":"97f8f1cf-3aa0-49bb-b3d5-05b736e0c883","latitude":"35.52624182094499","longitude":"-164.18066699972852"}'),
(9, '{"id":"6bed49bc-ee93-4ed9-893f-4f51c7b7f703","latitude":"-24.319581484353847","longitude":"85.27338980948076"}')
],
['id', 'value']
).drop('id')
+--------------------------------------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------------------------------------+
|{"id":"e52f247c-f46c-4021-bc62-e28e56db1ad8","latitude":"34.5016064725731","longitude":"123.43996453687777"} |
|{"id":"32782100-9b59-49c7-9d56-bb4dfc368a86","latitude":"49.938541626415144","longitude":"111.88360885971986"}|
|{"id":"a72a600f-2b99-4c41-a388-9a24c00545c0","latitude":"4.988768300413497","longitude":"-141.92727675177588"}|
|{"id":"5a5f056a-cdfd-4957-8e84-4d5271253509","latitude":"41.802942545247134","longitude":"90.45164573613573"} |
|{"id":"d00d0926-46eb-45dd-9e35-ab765804340d","latitude":"70.60161063520081","longitude":"20.566520665122482"} |
|{"id":"dda14397-6922-4bb6-9be3-a1546f08169d","latitude":"68.400462882435","longitude":"135.7167027587489"} |
|{"id":"c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea","latitude":"26.04757722355835","longitude":"175.20227554031783"} |
|{"id":"97f8f1cf-3aa0-49bb-b3d5-05b736e0c883","latitude":"35.52624182094499","longitude":"-164.18066699972852"}|
|{"id":"6bed49bc-ee93-4ed9-893f-4f51c7b7f703","latitude":"-24.319581484353847","longitude":"85.27338980948076"}|
+--------------------------------------------------------------------------------------------------------------+
Then:
from pyspark.sql import functions as F
from pyspark.sql.types import *
json_schema = StructType([
StructField("id", StringType(), True),
StructField("latitude", FloatType(), True),
StructField("longitude", FloatType(), True)
])
df\
.withColumn('json', F.from_json(F.col('value'), json_schema))\
.select(F.col('json').getItem('id').alias('id'),
F.col('json').getItem('latitude').alias('latitude'),
F.col('json').getItem('longitude').alias('longitude')
)\
.show(truncate=False)
+------------------------------------+-------------------+-------------------+
|id |latitude |longitude |
+------------------------------------+-------------------+-------------------+
|e52f247c-f46c-4021-bc62-e28e56db1ad8|34.5016064725731 |123.43996453687777 |
|32782100-9b59-49c7-9d56-bb4dfc368a86|49.938541626415144 |111.88360885971986 |
|a72a600f-2b99-4c41-a388-9a24c00545c0|4.988768300413497 |-141.92727675177588|
|5a5f056a-cdfd-4957-8e84-4d5271253509|41.802942545247134 |90.45164573613573 |
|d00d0926-46eb-45dd-9e35-ab765804340d|70.60161063520081 |20.566520665122482 |
|dda14397-6922-4bb6-9be3-a1546f08169d|68.400462882435 |135.7167027587489 |
|c7f13b8a-3468-4bc6-9db4-e0b1b34bf9ea|26.04757722355835 |175.20227554031783 |
|97f8f1cf-3aa0-49bb-b3d5-05b736e0c883|35.52624182094499 |-164.18066699972852|
|6bed49bc-ee93-4ed9-893f-4f51c7b7f703|-24.319581484353847|85.27338980948076 |
+------------------------------------+-------------------+-------------------+
Upvotes: 1