Climbs_lika_Spyder
Climbs_lika_Spyder

Reputation: 6754

How do I access the fields within a VARIANT column while reading from Kafka using Spark?

I get a nice structure if I do not try to get to nested fields. I am reading from Kafka and writing to a table. The issue happens on the readStream. I get [INVALID_EXTRACT_BASE_FIELD_TYPE] Can't extract a value from "data". Need a complex type [STRUCT, ARRAY, MAP] but got "VARIANT". SQLSTATE: 42000

Here is my readStream:

    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
        .option("subscribe", TOPIC) \
        .option("startingOffsets", "latest") \
        .... \
        .load() \
        .withColumn("data", parse_json(col("value").cast("string"))) \
        .select("data, data:unique_id")\
        .withColumn("timestamp", current_timestamp())
    
    display(df)

Upvotes: 0

Views: 271

Answers (2)

Alpha Bravo
Alpha Bravo

Reputation: 189

In DBR 15.3+ and in Apache Spark 4, you can also use

import pyspark.sql.functions as F # or:
from pyspark.sql.functions import try_variant_get
df = ...
    .withColumn("data", F.try_variant_get(F.col('value'), '$.unique_id', 'string')

Where $.unique_id is a path to the dictionary key with $ taking the position of root, assuming value is in VariantType already (or retain the parse_json() as in your example to cast it as such).

There are also these functions:

  • variant_get (which will throw an exception if it cannot cast to the desired type),
  • is_variant_null
  • schema_of_variant
  • schema_of_variant_agg

There are also these functions listed, but they are not supported in Pyspark:

  • variant_explode (will create a set of records which un-nests the variant dataset)
  • variant_explode_outer (like variant_explode, but also produces a single row of NULLs when the input is NULL or not Variant or Variant Array)

Upvotes: 1

Climbs_lika_Spyder
Climbs_lika_Spyder

Reputation: 6754

It turns out that using selectExpr is required:

    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
        .option("subscribe", TOPIC) \
        .option("startingOffsets", "latest") \
        .... \
        .load() \
        .withColumn("data", parse_json(col("value").cast("string"))) \
        .selectExpr("data, data:unique_id")\
        .withColumn("timestamp", current_timestamp())
    
    display(df)

Upvotes: 1

Related Questions