Reputation: 6754
I get a nice structure if I do not try to get to nested fields. I am reading from Kafka and writing to a table. The issue happens on the readStream. I get [INVALID_EXTRACT_BASE_FIELD_TYPE] Can't extract a value from "data". Need a complex type [STRUCT, ARRAY, MAP] but got "VARIANT". SQLSTATE: 42000
Here is my readStream:
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
.option("subscribe", TOPIC) \
.option("startingOffsets", "latest") \
.... \
.load() \
.withColumn("data", parse_json(col("value").cast("string"))) \
.select("data, data:unique_id")\
.withColumn("timestamp", current_timestamp())
display(df)
Upvotes: 0
Views: 271
Reputation: 189
In DBR 15.3+ and in Apache Spark 4, you can also use
import pyspark.sql.functions as F # or:
from pyspark.sql.functions import try_variant_get
df = ...
.withColumn("data", F.try_variant_get(F.col('value'), '$.unique_id', 'string')
Where $.unique_id
is a path to the dictionary key with $
taking the position of root, assuming value
is in VariantType already (or retain the parse_json()
as in your example to cast it as such).
There are also these functions:
variant_get
(which will throw an exception if it cannot cast to the desired type),is_variant_null
schema_of_variant
schema_of_variant_agg
There are also these functions listed, but they are not supported in Pyspark:
variant_explode
(will create a set of records which un-nests the variant dataset)variant_explode_outer
(like variant_explode
, but also produces a single row of NULLs when the input is NULL or not Variant or Variant Array)Upvotes: 1
Reputation: 6754
It turns out that using selectExpr
is required:
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
.option("subscribe", TOPIC) \
.option("startingOffsets", "latest") \
.... \
.load() \
.withColumn("data", parse_json(col("value").cast("string"))) \
.selectExpr("data, data:unique_id")\
.withColumn("timestamp", current_timestamp())
display(df)
Upvotes: 1