m v
m v

Reputation: 71

Use parquet file with special characters in column names in PySpark

MAIN GOAL
Show or select columns from the Spark dataframe read from the parquet file. All the solutions mentioned in the forum are not successfull in our case.

PROBLEM
The issue happens when the parquet file is read and queried with SPARK and is due to the presence of special characters ,;{}()\n\t= within column names. The problem was reproduced with a simple parquet files with two columns and five rows. The names of the columns are:

The error arised is:
Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.

We are using PySpark in Python language and the experimented solutions can be categorized as it follows:

  1. Solutions based on column rename - [spark.read.parquet + rename of the obtained dataframe]
    Several solutions have been experimented:

    • withColumnRenamed (Issue N.2 within the script)
    • toDF (Issue N.3)
    • alias (Issue N.5)

    None of them is working in our case.

  2. Read the parquet file into a Pandas dataframe and then create a new one from it - [pd.read.parquet + spark.createDataFrame]
    This solution is working with a small parquet file (Issue N.0 i.e. WORKAROUND within the script): the created spark dataframe can be successfully queried even if it has column names containing special characters. Unfortunately it is unpracticable with our big parquet files (600000 rows x 1000 columns for each parquet), since creating the spark dataframe is interminable.

  3. An attempt to read the parquet file into a Spark dataframe and create a new Spark dataframe with its rdd and a renamed schema is not practicable since the extraction of the rdd from the Spark dataframe arises the same error (Issue N.4).

  4. Read the parquet file with a prefixed schema (that avoids the special characters) - [spark.read.schema(...).parquet]
    The solution is not working, since data related to critical columns become null/None as expected since the renamed columns are not present within the original file.

The mentioned solutions are summarized in the python code below and have been experimented with the Example parquet file.

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col

import pandas as pd

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Select file
filename = 'D:/Simple.parquet'

issue_num = 0 # Workaround to issues (Equivalent to no issue)
#issue_num = 1 # Issue 1 - Unable to show dataframe or select column with name containing invalid character(s)
#issue_num = 2 # Issue 2 - Unable to show dataframe or select column after rename (using withColumnRenamed)
#issue_num = 3 # Issue 3 - Unable to show dataframe or select column after rename (using toDF)
#issue_num = 4 # Issue 4 - Unable to extract rdd from renamed dataframe 
#issue_num = 5 # Issue 5 - Unable to select column with alias

if issue_num == 0:

    ################################################################################################
    # WORKAROUND - Create Spark data frame from Pandas dataframe
    df_pd = pd.read_parquet(filename)
    DF = spark.createDataFrame(df_pd)
    print('WORKAROUND')
    DF.show()
    # +-----------------------------------+-----------------------------------+
    # |SpeedReference_Final_01 (RifVel_G0)|SpeedReference_Final_02 (RifVel_G1)|
    # +-----------------------------------+-----------------------------------+
    # |                  553.5228271484375|                     720.3720703125|
    # |                  553.5228271484375|                     720.3720703125|
    # |                  553.5228271484375|                     720.3720703125|
    # |                  553.5228271484375|                     720.3720703125|
    # |                  553.5228271484375|                     720.3720703125|
    # +-----------------------------------+-----------------------------------+

    ################################################################################################
    # Correct management of columns with  invalid characters when using spark.createDataFrame
    # spark.createDataFrame: Create a dataframe with two columns with  invalid characters - OK
    # DFCREATED
    schema = StructType(
        [
            StructField("SpeedReference_Final_01 (RifVel_G0)", FloatType(), nullable=True),
            StructField("SpeedReference_Final_02 (RifVel_G1)", FloatType(), nullable=True)
        ]
    )

    row_in = [(553.523,720.372), (553.523,720.372), (553.523,720.372), (553.523,720.372), (553.523,720.372)]

    rdd=spark.sparkContext.parallelize(row_in)
    DFCREATED = spark.createDataFrame(rdd, schema)
    DFCREATED.show()
    # +-----------------------------------+-----------------------------------+
    # |SpeedReference_Final_01 (RifVel_G0)|SpeedReference_Final_02 (RifVel_G1)|
    # +-----------------------------------+-----------------------------------+
    # |                            553.523|                            720.372|
    # |                            553.523|                            720.372|
    # |                            553.523|                            720.372|
    # |                            553.523|                            720.372|
    # |                            553.523|                            720.372|
    # +-----------------------------------+-----------------------------------+
    DF_SEL_VAR_CREATED = DFCREATED.select(DFCREATED.columns[0]).take(2)
    for el in DF_SEL_VAR_CREATED:
        print(el)
    #Row(SpeedReference_Final_01 (RifVel_G0)=553.5230102539062)
    #Row(SpeedReference_Final_01 (RifVel_G0)=553.5230102539062)
    
else:
    # spark.read: read file into dataframe - OK
    DF = spark.read.parquet(filename)
    print('ORIGINAL SCHEMA')
    DF.printSchema()
    # root
    #  |-- SpeedReference_Final_01 (RifVel_G0): float (nullable = true)
    #  |-- SpeedReference_Final_02 (RifVel_G1): float (nullable = true)
    
    if issue_num == 1:
        ###############################################################################################    
        # Issue 1 - Unable to show dataframe or select column with name containing invalid character(s)
        DF.show()
        # DF.select(DF.columns[0]).show()
        # DF_SEL_VAR = DF.select(DF.columns[0]).take(3)
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
        # on all 3 previous statements

    elif issue_num == 2:
        ###############################################################################################    
        # Issue 2 - Unable to show dataframe or select column after rename (using withColumnRenamed)
        DFRENAMED = DF.withColumnRenamed('SpeedReference_Final_01 (RifVel_G0)','RifVelG0').withColumnRenamed('SpeedReference_Final_02 (RifVel_G1)','RifVelG1')
       
        print('RENAMED SCHEMA')
        DFRENAMED.printSchema()
        # root
        #  |-- RifVelG0: float (nullable = true)
        #  |-- RifVelG1: float (nullable = true)

        DFRENAMED.show()
        # DF_SEL_VAR_RENAMED = DFRENAMED.select(DFRENAMED.RifVelG0).take(2)
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
        # on all 2 previous statements

    elif issue_num == 3:
        ###############################################################################################    
        # Issue 3 - Unable to show dataframe or select column after rename (using to_DF)
        DFRENAMED = DF.toDF('RifVelG0', 'RifVelG1')
    
        print('RENAMED SCHEMA')
        DFRENAMED.printSchema()
        # root
        #  |-- RifVelG0: float (nullable = true)
        #  |-- RifVelG1: float (nullable = true)

        DFRENAMED.show()
        # DF_SEL_VAR_RENAMED = DFRENAMED.select(DFRENAMED.RifVelG0).take(2)
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.
        # on all 2 previous statements

    elif issue_num == 4:
        ###############################################################################################    
        # Issue 4 - Unable to extract rdd from renamed dataframe 
        DFRENAMED = DF.withColumnRenamed('SpeedReference_Final_01 (RifVel_G0)','RifVelG0').withColumnRenamed('SpeedReference_Final_02 (RifVel_G1)','RifVelG1')
        DFRENAMED_rdd = DFRENAMED.rdd
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.

    elif issue_num == 5:
        ###############################################################################################    
        # Issue 5 - Unable to select column with alias
        DF_SEL_VAR = DF.select(col(DF.columns[0]).alias('RifVelG0')).take(3)
        #ECC: Attribute name "SpeedReference_Final_01 (RifVel_G0)" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.

Have you any idea on how we can solve the problem?

Any suggestion is really appreciated.

Upvotes: 7

Views: 6585

Answers (1)

Vidar
Vidar

Reputation: 71

try something like this:

import re
import pyspark.sql.functions as f

def remove_special_characters(string: str):
    return re.sub("[^a-zA-Z0-9 ]", "", string)

DFCREATED = DFCREATED.select(
    [
        f.col(column).alias(remove_special_characters(column))
        for column in DFCREATED.columns
    ]
)

also I think you can use this function to remove other things like space.

Upvotes: 0

Related Questions