BMac
BMac

Reputation: 303

Pyspark Dataframe read is shifting column contents by inconsistent number

Code Versions:

Hello, hoping someone can help me with this. I'm using PySpark to read several large files (around 80 GB each, 6 or so of them).
Using one leader node, two worker nodes.

The file being read has over 1000 cols, and has millions on millions of rows. When I try to process this file with the pyspark read function, it pushes out column values by an inconsistent number of columns.

Things I have tried:

  1. Looked at number of delimiters on each row. This number appears to be consistent across rows, but I've only looked at one file.
  2. Tried fixed width for each col by using read.text(s3_path) and using substr for each column. This did not work.
  3. Tried to infer nulls from empty strings .option("emptyValue", '')
  4. When I try to look at the raw data from the CSV, I do not see unescaped quotes or \t in it. Reading problematic rows in pandas reads correctly...
from pyspark import SparkContext, SparkConf
from pyspark.sql import *
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.functions import *


Class SparkCode:
    def __init__(self):
       self.session = SparkSession.builder.appName("MyApp").getOrCreate()
       self.schema = StructType().add

    def process_file(self, s3_bucket, s3_key):
        s3_path = f's3a://{s3_bucket}/{s3_key}'

        df_reader = self.session.read

        responses = df_reader.option("delimiter", "\t") \
            .csv(s3_path) \
            .select(
                    col('_c5').alias("state").cast(StringType()),
                    col('_c6').alias("zip_code").cast(StringType()),
                    col('_c7').alias("zip_plus_4").cast(StringType()),
                    col('_c8').alias("carrier_route").cast(StringType()),
                    col('_c9').alias("county_code").cast(StringType()),
                    col('_c10').alias("county_name").cast(StringType()),
                    col('_c11').alias("phone_number").cast(StringType()),
...
                    col('_c745').alias("another_important_col").cast(StringType()))
        responses.show()
        responses.select("all_the_above_cols").write.mode("append").parquet("s3a://other_s3_path" + ".parquet/")

    

Question: given that investigating each problematic row takes a long time, what can I do to improve the above code to parse these data correctly? Any suggestions or tips are appreciated.

Upvotes: 1

Views: 1809

Answers (1)

Nikunj Kakadiya
Nikunj Kakadiya

Reputation: 2998

There are few things that you could try out.

  1. If you are opening your csv file in Excel and checking and you don't see any issues then try to open it in notepad++ and see if you see any differences. Sometimes if there is something odd in csv files and if we open in excel we don't come to know about it.
  2. See your csv files has any column whose value can be multiline. If you find anything then you can add an option("multiline","true") and see if that works.
  3. If there are any quotes or escape strings in your data that might be causing the issue then you could add two more options to see if it works. i.e .option("quote", "\"") and .option("escape", "\"")

I had similar issue and got around by adding this extra options to the reader and it worked fine.

Upvotes: 3

Related Questions