Reputation: 199
I am trying to process JSON column from PostgreSQL database. I am able to connect to database using:
import os
import pyspark
import findspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
findspark.init(os.environ['SPARK_HOME'])
# DB credentials
user = os.environ['EVENTS_DEV_UID']
password = os.environ['EVENTS_DEV_PWD']
host = os.environ['EVENTS_DEV_HOST']
port = os.environ['EVENTS_DEV_PORT']
db = os.environ['EVENTS_DEV_DBNAME']
# Initiate spark session
sc = SparkContext()
spark = SQLContext(sc)
# Set properties
properties = {"user": user, "password": password, "driver": "org.postgresql.Driver"}
# Load data
df = spark.read.jdbc(
url = 'jdbc:postgresql://' + host + ':' + port + '/' + db,
table = 'events',
properties = properties)
The problem starts with casting JSON field. Spark doesn't recognize struct format of params
. When I print schema:
df.printSchema()
root
|-- time: timestamp (nullable = true)
|-- name: string (nullable = true)
|-- params: string (nullable = true)
When I try to cast string to struct:
df = df.withColumn('params', df.params.cast('struct'))
I am getting following error:
ParseException: '\nDataType struct is not supported.(line 1, pos 0)\n\n== SQL ==\nstruct\n^^^\n'
I guess problem is the escape characters. Anybody has idea how to proceed?
Upvotes: 1
Views: 1873
Reputation: 10096
"struct"
is not a valid casting type. You can define your own UDF
using python's json.loads
function.
Let's start with a sample data frame:
df = sc.parallelize([[1, "a", "{\"a\":1, \"b\":2}"], [2, "b", "{\"a\":3, \"b\":4}"]])\
.toDF(["col1", "col2", "json_col"])
df.show()
+----+----+--------------+
|col1|col2| json_col|
+----+----+--------------+
| 1| a|{"a":1, "b":2}|
| 2| b|{"a":3, "b":4}|
+----+----+--------------+
Then the output StructType
would have schema:
from pyspark.sql.types import IntegerType, StructField, StructType
schema = StructType([StructField("a", IntegerType()), StructField("b", IntegerType())])
You cannot cast StringType
to StructType
, hence the UDF
:
import pyspark.sql.functions as psf
import json
json_load = psf.udf(json.loads, schema)
Now we can process json_col
:
df_parsed = df.withColumn("parsed_json", json_load("json_col"))
df_parsed.show()
df_parsed.printSchema()
+----+----+--------------+-----------+
|col1|col2| json_col|parsed_json|
+----+----+--------------+-----------+
| 1| a|{"a":1, "b":2}| [1,2]|
| 2| b|{"a":3, "b":4}| [3,4]|
+----+----+--------------+-----------+
root
|-- col1: long (nullable = true)
|-- col2: string (nullable = true)
|-- json_col: string (nullable = true)
|-- parsed_json: struct (nullable = true)
| |-- a: integer (nullable = true)
| |-- b: integer (nullable = true)
You can also try passing the schema directly when loading the data frame.
Upvotes: 2