Reputation: 565
I've the sample data as shown below, i would need to convert columns(ABS, ALT) from string to Array[structType] using spark scala code. Any help would be much appreciated.
With the help of UDF, i was able to convert from string to arrayType, but need some help on converting from string to Array[structType] for these two columns(ABS, ALT).
VIN TT MSG_TYPE ABS ALT
MSGXXXXXXXX 1 SIGL [{"E":1569XXXXXXX,"V":0.0}]
[{"E":156957XXXXXX,"V":0.0}]
df.currentSchema
root
|-- VIN: string (nullable = true)
|-- TT: long (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- ABS: string (nullable = true)
|-- ALT: string (nullable = true)
df.expectedSchema:
|-- VIN: string (nullable = true)
|-- TT: long (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- ABS: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: long (nullable = true)
|-- ALT: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = true)
Upvotes: 2
Views: 3755
Reputation: 565
It also works if you try as below:
import org.apache.spark.sql.types.{StructField, StructType, ArrayType, StringType}
val schema = ArrayType(StructType(Seq(StructField("E", LongType), StructField("V", DoubleType))))
val final_df = newDF.withColumn("ABS", from_json($"ABS", schema)).withColumn("ALT", from_json($"ALT", schema))
final_df.printSchema:
root
|-- VIN: string (nullable = true)
|-- TT: string (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- ABS: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = false)
|-- ALT: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: long (nullable = true)
| | |-- V: double (nullable = false)
Upvotes: 3
Reputation: 14845
You can use an udf to parse the Json and transform it into arrays of structs.
First, define a function that parses the Json (based on this answer):
case class Data(E:String, V:Double)
class CC[T] extends Serializable { def unapply(a: Any): Option[T] = Some(a.asInstanceOf[T]) }
object M extends CC[Map[String, Any]]
object L extends CC[List[Any]]
object S extends CC[String]
object D extends CC[Double]
def toStruct(in: String): Array[Data] = {
if( in == null || in.isEmpty) return new Array[Data](0)
val result = for {
Some(L(map)) <- List(JSON.parseFull(in))
M(data) <- map
S(e) = data("E")
D(v) = data("V")
} yield {
Data(e, v)
}
result.toArray
}
This function returns an array of Data
objects, that have already the correct structure. Now we use this function to define an udf
val ts: String => Array[Data] = toStruct(_)
import org.apache.spark.sql.functions.udf
val toStructUdf = udf(ts)
Finally we call the udf (for example in a select
statement):
val df = ...
val newdf = df.select('VIN, 'TT, 'MSG_TYPE, toStructUdf('ABS).as("ABS"), toStructUdf('ALT).as("ALT"))
newdf.printSchema()
Output:
root
|-- VIN: string (nullable = true)
|-- TT: string (nullable = true)
|-- MSG_TYPE: string (nullable = true)
|-- ABS: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: string (nullable = true)
| | |-- V: double (nullable = false)
|-- ALT: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- E: string (nullable = true)
| | |-- V: double (nullable = false)
Upvotes: 1