PPC
PPC

Reputation: 167

How do I convert array<FloatType> to BinaryType in spark dataframes using Scala

In a spark data frame, one of my columns contains an Array of float values, how can I convert that column to BinaryType.

Here is some sample data and how it looks:

val df = spark.sparkContext.parallelize(Seq(("one", Array[Float](1, 2, 3, 4, 5)), ("two", Array[Float](6, 7, 8, 9, 10)))).toDF("Name", "Values")
df.show()
df:org.apache.spark.sql.DataFrame
Name:string
Values:array
    element:float
+----+--------------------+
|Name|              Values|
+----+--------------------+
| one|[1.0, 2.0, 3.0, 4...|
| two|[6.0, 7.0, 8.0, 9...|
+----+--------------------+

In the above example, Values field is Array, How can I convert to Values field Array/BinaryType?

The expected schema is :

Name:string
Values:binary

Upvotes: 1

Views: 1844

Answers (2)

user6860682
user6860682

Reputation:

The solution of your task is using UDF for converting type

val df = spark.sparkContext.parallelize(Seq(("one", Array[Float](1, 2, 3, 4, 5)), ("two", Array[Float](6, 7, 8, 9, 10)))).toDF("Name", "Values")

import org.apache.spark.sql.functions.udf
import scala.collection.mutable.WrappedArray

val toByteArray = udf { values: WrappedArray[Float] => values.map(_.toByte) }

val result = df.withColumn("Values", toByteArray($"Values"))

result.show()
result.printSchema

Important

This is neither safe nor efficient. Not safe because a single NULL or malformed entry will crash a whole job. Not efficient because UDFs are not transparent to Catalyst. Seq(("one", Array[Float](1, 2, 3, 4, 5)), ("two", null))).toDF("Name", "Values") will crash code above. If it's possible, just try to avoid this cast operation or try to handle corner cases in your UDF.

Upvotes: 1

Teja Parimi
Teja Parimi

Reputation: 111

You need to write an UDF that takes Array[Float] and return Array[Byte]

val binUdf = udf((arr:WrappedArray[Float]) => {arr.to.map(_.toByte)})
scala> df.withColumn("Values",binUdf($"Values")).printSchema
root
 |-- Name: string (nullable = true)
 |-- Values: binary (nullable = true)

Or You can do it when creating the DataFrame, by changing Array[Float] -> Array[Byte] as well.

val df = spark.sparkContext.parallelize(Seq(("one", Array[Byte](1, 2, 3, 4, 5)), ("two", Array[Byte](6, 7, 8, 9, 10)))).toDF("Name", "Values")

Upvotes: 1

Related Questions