Khan Saab
Khan Saab

Reputation: 511

Unsupported operation exception from spark: Schema for type org.apache.spark.sql.types.DataType is not supported

Spark Streaming:

I am receiving a dataframe that consists of two columns. The first column is of string type that contains a json string and the second column consists of schema for each value(first column).

Batch: 0
-------------------------------------------
+--------------------+--------------------+
|               value|              schema|
+--------------------+--------------------+
|{"event_time_abc...|`event_time_abc...|
+--------------------+--------------------+

The table is stored in the val input(non mutable variable). I am using DataType.fromDDL function to convert the string type to a json DataFrame in the following way:

val out=  input.select(from_json(col("value").cast("string"),ddl(col("schema"))))

where ddl is a predefined function,DataType.from_ddl(_:String):DataType in spark(scala) but i have registered it so that i can use it on whole column instead of a string only. I have done it in following way:

val ddl:UserDefinedFunction = udf(DataType.fromDDL(_:String):DataType)

and here is the final transformation on both column, value and schema of input table.

val out =  input.select(from_json(col("value").cast("string"),ddl(col("schema"))))

However, i get exception from the registration at this line:

val ddl:UserDefinedFunction = udf(DataType.fromDDL(_:String):DataType)

The error is:

java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.types.DataType is not supported
 

If i use:

val out =  input.select(from_json(col("value").cast("string"),DataType.fromDDL("`" + "event_time_human2"+"`" +" STRING")).alias("value"))

then it works but as you see i am only using a string(manually typed coming from the schema column) inside the function DataType.fromDDL(_:String):DataType.

So how can i apply this function to whole column without registration or is there any other way to register the function?

EDIT: from_json function's first argument requires a column while second argument requires a schema and not a column. Hence, i guess a manual approach is required to parse each value field with each schema field. After some investigation i found out that DataFrames do not support DataType.


Since a bounty has been set on this question. I would like to provide additional information regarding the data and schema. The schema is defined in DDL(string type) and it can be parsed with from_DDL function. The value is simple json string that will be parsed with the schema that we derive using from_DDL function.

The basic idea is that each value has it's own schema and needs to be parsed with corresponding schema. A new column should be created where the result will be store.

Data: Here is one example of the data:

value = {"event_time_human2":"09:45:00 +0200 09/27/2021"}

schema = "`event_time_human2` STRING"

It is not needed to convert to correct time format. Just a string will be fine.

It is in streaming context. So ,not all approaches work.

Upvotes: 2

Views: 5261

Answers (1)

abiratsis
abiratsis

Reputation: 7316

Schemas are being applied and validated before runtime, that is, before the Spark code is executed on the executors. Parsed schemas must be part of the execution plan therefore schema parsing can't be executed dynamically as you intended until now. This is the reason that you see the exception: java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.types.DataType is not supported only for the UDF. Consequently that implies that DataType.fromDDL should be used only inside the driver code and not in the runtime/executor code, which is the code within your UDF function. Inside the UDF function Spark has already executed the transformation of the imported data applying the schemas that you specified on the driver side. This is the reason that you can't use DataType.fromDDL directly in your UDF because it is essentially useless. All the above means that inside the UDF functions we can only use primitive Scala/Java types and some wrappers provided by the Spark API i.e WrappedArray.

An alternative could be to collect all the schemas on the driver. Then create a map with the pair (schema, dataframe) for each schema.

Keep in mind that collecting data to the driver is an expensive operation and it would make sense only if you have a reasonable number of unique schemas, i.e max some thousands. Also, applying these schemas to each dataset need to be done sequentially in the driver, which is quite expensive too, therefore it is important to realize that the suggested solution will only work efficiently if you have a limited amount of unique schemas.

Up to this point, your code could look as next:

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types.StructType

import spark.implicits._

val df = Seq(
  ("""{"event_time_human2":"09:45:00 +0200 09/27/2021", "name":"Pinelopi"}""", "`event_time_human2` STRING, name STRING"),
  ("""{"first_name":"Pin", "last_name":"Halen"}""", "first_name STRING, last_name STRING"),
  ("""{"code":993, "full_name":"Sofia Loren"}""", "code INT, full_name STRING")
).toDF("value", "schema")

val schemaDf = df.select("schema").distinct()

val dfBySchema = schemaDf.collect().map{ row =>
  val schemaValue = row.getString(0)
  val ddl = StructType.fromDDL(schemaValue)
  val filteredDf = df.where($"schema" === schemaValue)
                     .withColumn("value", from_json($"value", ddl))
  
  (schemaValue, filteredDf)
}.toMap

// Map(
//   `event_time_human2` STRING, name STRING -> [value: struct<event_time_human2: string, name: string>, schema: string], 
//   first_name STRING, last_name STRING -> [value: struct<first_name: string, last_name: string>, schema: string], 
//   code INT, full_name STRING -> [value: struct<code: int, full_name: string>, schema: string]
// )

Explanation: first we gather each unique schema with schemaDf.collect(). Then we iterate through schemas and filter the initial df based on the current schema. We also use from_json to convert current string value column to the specific schema.

Note that we can't have one common column with different data type, this is the reason that we are creating a different df for each schema and not one final df.

Upvotes: 2

Related Questions