Reputation: 621
How to convert a Spark rdd containing np.array (or list) to a Spark DataFrame?
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.types import *
# Create a SparkSession
sc = SparkSession.builder.appName("SparkSQL").getOrCreate()
rdd = sc.sparkContext.parallelize(np.array([1.1,2.3,3,4,5,6,7,8,9,10]))
print(rdd.collect())
#[1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
Schema = StructType([
StructField('name', ArrayType(FloatType(), True))])
spark_df = rdd.toDF(Schema)
spark_df.show()
[1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
22/04/03 16:04:28 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 8)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
...
File "/home/zenbook/.local/lib/python3.9/site-packages/pyspark/sql/types.py", line 1398, in verify_struct
raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 1.1 in type <class 'numpy.float64'>
Note: I've removed some spark error logs for clarity.
I've tried with a list and get a pretty close error:
rdd = sc.sparkContext.parallelize([1.1,2.3,3,4,5,6,7,8,9,10])
print(rdd.collect())
#[1.1, 2.3, 3, 4, 5, 6, 7, 8, 9, 10]
Schema = StructType([
StructField('name', ArrayType(FloatType(), True))])
spark_df = rdd.toDF(Schema)
spark_df.show()
TypeError: StructType can not accept object 1.1 in type <class 'float'>
In “Learning Spark book(O'reilly)”, it's written ArrayType => List,Tuple or Array Python type.
Why I need to do that: I can't convert byte_array(hex) to float_array using Spark_DF (3 days trying this). I've converted them with Spark_rdd.
def get_msp(row):
""" extract MSN from row as numpy array"""
return np.frombuffer(row.MSP,dtype=np.float64) # or list adding .tolist()
spectra=df.rdd.map(format_value)
Maybe at the moment, converting a Spark_RDD with a list/array to a Spark_DF is not achievable?
Because I wish to use this array/list with SparkML (PCA and LLA), a solution with a VectorAssembler will be great too.
Upvotes: 2
Views: 958
Reputation: 29427
The numpy.float64
is not known to Spark's dataframes but you can convert the values in the rdd to floats with:
spark.createDataFrame(rdd.map(lambda x: float(x)), FloatType())
Demo:
rdd.collect()
# [1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
# rdd datatypes are numpy.float64
rdd.map(lambda x: type(x)).collect()
# [numpy.float64,
# numpy.float64,
# . . .
# numpy.float64]
# create dataframe
spark.createDataFrame(rdd.map(lambda x: float(x)), FloatType()).show()
# +-----+
# |value|
# +-----+
# | 1.1|
# | 2.3|
# | 3.0|
# | 4.0|
# | 5.0|
# | 6.0|
# | 7.0|
# | 8.0|
# | 9.0|
# | 10.0|
# +-----+
Upvotes: 1