saquiel
saquiel

Reputation: 621

How to convert a Spark rdd containing np.array (or list) to a Spark DataFrame?

How to convert a Spark rdd containing np.array (or list) to a Spark DataFrame?

from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.types import *

# Create a SparkSession
sc = SparkSession.builder.appName("SparkSQL").getOrCreate()

rdd = sc.sparkContext.parallelize(np.array([1.1,2.3,3,4,5,6,7,8,9,10]))
print(rdd.collect())
#[1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]

Schema = StructType([       
    StructField('name', ArrayType(FloatType(), True))])

spark_df = rdd.toDF(Schema)
spark_df.show()
[1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
22/04/03 16:04:28 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 8)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  ...
  File "/home/zenbook/.local/lib/python3.9/site-packages/pyspark/sql/types.py", line 1398, in verify_struct
    raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 1.1 in type <class 'numpy.float64'>

Note: I've removed some spark error logs for clarity.

I've tried with a list and get a pretty close error:

rdd = sc.sparkContext.parallelize([1.1,2.3,3,4,5,6,7,8,9,10])
print(rdd.collect())
#[1.1, 2.3, 3, 4, 5, 6, 7, 8, 9, 10]

Schema = StructType([       
    StructField('name', ArrayType(FloatType(), True))])

spark_df = rdd.toDF(Schema)
spark_df.show()
TypeError: StructType can not accept object 1.1 in type <class 'float'>

In “Learning Spark book(O'reilly)”, it's written ArrayType => List,Tuple or Array Python type.

Why I need to do that: I can't convert byte_array(hex) to float_array using Spark_DF (3 days trying this). I've converted them with Spark_rdd.

def get_msp(row):
    """ extract MSN from row as numpy array"""
    return np.frombuffer(row.MSP,dtype=np.float64) # or list adding .tolist()
spectra=df.rdd.map(format_value)

Maybe at the moment, converting a Spark_RDD with a list/array to a Spark_DF is not achievable?

Because I wish to use this array/list with SparkML (PCA and LLA), a solution with a VectorAssembler will be great too.

Upvotes: 2

Views: 958

Answers (1)

user2314737
user2314737

Reputation: 29427

The numpy.float64 is not known to Spark's dataframes but you can convert the values in the rdd to floats with:

spark.createDataFrame(rdd.map(lambda x: float(x)), FloatType())

Demo:

rdd.collect()
# [1.1, 2.3, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]

# rdd datatypes are numpy.float64
rdd.map(lambda x: type(x)).collect() 
# [numpy.float64,
#  numpy.float64,
# . . . 
#  numpy.float64]

# create dataframe
spark.createDataFrame(rdd.map(lambda x: float(x)), FloatType()).show()
# +-----+
# |value|
# +-----+
# |  1.1|
# |  2.3|
# |  3.0|
# |  4.0|
# |  5.0|
# |  6.0|
# |  7.0|
# |  8.0|
# |  9.0|
# | 10.0|
# +-----+

Upvotes: 1

Related Questions