Reputation: 71
MAIN GOAL
Show or select columns from the Spark dataframe read from the parquet file.
All the solutions mentioned in the forum are not successfull in our case.
PROBLEM
The issue happens when the parquet file is read and queried with SPARK and is due to the presence of special characters ,;{}()\n\t=
within column names. The problem was reproduced with a simple parquet files with two columns and five rows. The names of the columns are:
SpeedReference_Final_01 (RifVel_G0)
SpeedReference_Final_02 (RifVel_G1)
The error arised is:
Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
We are using PySpark in Python language and the experimented solutions can be categorized as it follows:
Solutions based on column rename - [spark.read.parquet
+ rename of the obtained dataframe]
Several solutions have been experimented:
withColumnRenamed
(Issue N.2 within the script)toDF
(Issue N.3)alias
(Issue N.5)None of them is working in our case.
Read the parquet file into a Pandas dataframe and then create a new one from it - [pd.read.parquet
+ spark.createDataFrame
]
This solution is working with a small parquet file (Issue N.0 i.e. WORKAROUND within the script): the created spark dataframe can be successfully queried even if it has column names containing special characters. Unfortunately it is unpracticable with our big parquet files (600000 rows x 1000 columns for each parquet), since creating the spark dataframe is interminable.
An attempt to read the parquet file into a Spark dataframe and create a new Spark dataframe with its rdd
and a renamed schema is not practicable since the extraction of the rdd
from the Spark dataframe arises the same error (Issue N.4).
Read the parquet file with a prefixed schema (that avoids the special characters) - [spark.read.schema(...).parquet
]
The solution is not working, since data related to critical columns become null/None as expected since the renamed columns are not present within the original file.
The mentioned solutions are summarized in the python code below and have been experimented with the Example parquet file.
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col
import pandas as pd
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# Select file
filename = 'D:/Simple.parquet'
issue_num = 0 # Workaround to issues (Equivalent to no issue)
#issue_num = 1 # Issue 1 - Unable to show dataframe or select column with name containing invalid character(s)
#issue_num = 2 # Issue 2 - Unable to show dataframe or select column after rename (using withColumnRenamed)
#issue_num = 3 # Issue 3 - Unable to show dataframe or select column after rename (using toDF)
#issue_num = 4 # Issue 4 - Unable to extract rdd from renamed dataframe
#issue_num = 5 # Issue 5 - Unable to select column with alias
if issue_num == 0:
################################################################################################
# WORKAROUND - Create Spark data frame from Pandas dataframe
df_pd = pd.read_parquet(filename)
DF = spark.createDataFrame(df_pd)
print('WORKAROUND')
DF.show()
# +-----------------------------------+-----------------------------------+
# |SpeedReference_Final_01 (RifVel_G0)|SpeedReference_Final_02 (RifVel_G1)|
# +-----------------------------------+-----------------------------------+
# | 553.5228271484375| 720.3720703125|
# | 553.5228271484375| 720.3720703125|
# | 553.5228271484375| 720.3720703125|
# | 553.5228271484375| 720.3720703125|
# | 553.5228271484375| 720.3720703125|
# +-----------------------------------+-----------------------------------+
################################################################################################
# Correct management of columns with invalid characters when using spark.createDataFrame
# spark.createDataFrame: Create a dataframe with two columns with invalid characters - OK
# DFCREATED
schema = StructType(
[
StructField("SpeedReference_Final_01 (RifVel_G0)", FloatType(), nullable=True),
StructField("SpeedReference_Final_02 (RifVel_G1)", FloatType(), nullable=True)
]
)
row_in = [(553.523,720.372), (553.523,720.372), (553.523,720.372), (553.523,720.372), (553.523,720.372)]
rdd=spark.sparkContext.parallelize(row_in)
DFCREATED = spark.createDataFrame(rdd, schema)
DFCREATED.show()
# +-----------------------------------+-----------------------------------+
# |SpeedReference_Final_01 (RifVel_G0)|SpeedReference_Final_02 (RifVel_G1)|
# +-----------------------------------+-----------------------------------+
# | 553.523| 720.372|
# | 553.523| 720.372|
# | 553.523| 720.372|
# | 553.523| 720.372|
# | 553.523| 720.372|
# +-----------------------------------+-----------------------------------+
DF_SEL_VAR_CREATED = DFCREATED.select(DFCREATED.columns[0]).take(2)
for el in DF_SEL_VAR_CREATED:
print(el)
#Row(SpeedReference_Final_01 (RifVel_G0)=553.5230102539062)
#Row(SpeedReference_Final_01 (RifVel_G0)=553.5230102539062)
else:
# spark.read: read file into dataframe - OK
DF = spark.read.parquet(filename)
print('ORIGINAL SCHEMA')
DF.printSchema()
# root
# |-- SpeedReference_Final_01 (RifVel_G0): float (nullable = true)
# |-- SpeedReference_Final_02 (RifVel_G1): float (nullable = true)
if issue_num == 1:
###############################################################################################
# Issue 1 - Unable to show dataframe or select column with name containing invalid character(s)
DF.show()
# DF.select(DF.columns[0]).show()
# DF_SEL_VAR = DF.select(DF.columns[0]).take(3)
#ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
# on all 3 previous statements
elif issue_num == 2:
###############################################################################################
# Issue 2 - Unable to show dataframe or select column after rename (using withColumnRenamed)
DFRENAMED = DF.withColumnRenamed('SpeedReference_Final_01 (RifVel_G0)','RifVelG0').withColumnRenamed('SpeedReference_Final_02 (RifVel_G1)','RifVelG1')
print('RENAMED SCHEMA')
DFRENAMED.printSchema()
# root
# |-- RifVelG0: float (nullable = true)
# |-- RifVelG1: float (nullable = true)
DFRENAMED.show()
# DF_SEL_VAR_RENAMED = DFRENAMED.select(DFRENAMED.RifVelG0).take(2)
#ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
# on all 2 previous statements
elif issue_num == 3:
###############################################################################################
# Issue 3 - Unable to show dataframe or select column after rename (using to_DF)
DFRENAMED = DF.toDF('RifVelG0', 'RifVelG1')
print('RENAMED SCHEMA')
DFRENAMED.printSchema()
# root
# |-- RifVelG0: float (nullable = true)
# |-- RifVelG1: float (nullable = true)
DFRENAMED.show()
# DF_SEL_VAR_RENAMED = DFRENAMED.select(DFRENAMED.RifVelG0).take(2)
#ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
# on all 2 previous statements
elif issue_num == 4:
###############################################################################################
# Issue 4 - Unable to extract rdd from renamed dataframe
DFRENAMED = DF.withColumnRenamed('SpeedReference_Final_01 (RifVel_G0)','RifVelG0').withColumnRenamed('SpeedReference_Final_02 (RifVel_G1)','RifVelG1')
DFRENAMED_rdd = DFRENAMED.rdd
#ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
elif issue_num == 5:
###############################################################################################
# Issue 5 - Unable to select column with alias
DF_SEL_VAR = DF.select(col(DF.columns[0]).alias('RifVelG0')).take(3)
#ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
Have you any idea on how we can solve the problem?
Any suggestion is really appreciated.
Upvotes: 7
Views: 6585
Reputation: 71
try something like this:
import re
import pyspark.sql.functions as f
def remove_special_characters(string: str):
return re.sub("[^a-zA-Z0-9 ]", "", string)
DFCREATED = DFCREATED.select(
[
f.col(column).alias(remove_special_characters(column))
for column in DFCREATED.columns
]
)
also I think you can use this function to remove other things like space.
Upvotes: 0