Folkas
Folkas

Reputation: 29

How to process .csv files by filtering their titles

I've got a list of plenty of .csv files in a dataset, but for the simplicity let's assume there are three files in the dataset:

I need to process only files with 'ASGK' in their titles. In other words, I need to filter files by their titles using transforms.api.FileSystem.files method. All files share same column names.

I filter files using regex code. Here's two pieces of code I've been using, however, unsuccessfully.

from pyspark.sql import functions as F
from transforms.api import transform, Input, Output
import pandas as pd
import json
import re


@transform(
    output_df=Output(
        ""),
    input_raw=Input(""),
)
def compute(input_raw, output_df, ctx):

    def process_file(file_status):
        with input_raw.filesystem().open(file_status.path, 'rb') as f:
            with pd.ExcelFile(f.read(), engine='openpyxl') as xlsx_path:

                pdf = pd.read_csv(xlsx_path, dtype=str, header=0)
                pdf.columns = pdf.columns.str.lower()

                for row in pdf.to_dict('records'):
                    yield json.dumps(row, default=str)

    rdd = input_raw.filesystem().files(regex=r'.*ASGK.*\.csv$').rdd.flatMap(process_file)
    spark = ctx.spark_session
    dfs = spark.read.json(rdd)
    output_df.write_dataframe(dfs)

The error I get:

Traceback (most recent call last): File "/myproject/datasets/ExcelFile.py", line 27, in compute dfs = spark.read.json(rdd) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 241, in json return self._df(self._jreader.json(jrdd)) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/java_gateway.py", line 1322, in call return_value = get_return_value( File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o163.json. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (localhost executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process serializer.dump_stream(out_iter, outfile) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream for obj in iterator: File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 232, in func for x in iterator: File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/src/myproject/datasets/ExcelFile.py", line 17, in process_file with pd.ExcelFile(f.read(), engine='openpyxl') as xlsx_path: File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_base.py", line 1695, in init self._reader = self._engines[engine](self._io, storage_options=storage_options) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_openpyxl.py", line 557, in init super().init(filepath_or_buffer, storage_options=storage_options) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_base.py", line 545, in init self.book = self.load_workbook(self.handles.handle) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_openpyxl.py", line 568, in load_workbook return load_workbook( File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py", line 344, in load_workbook reader = ExcelReader(filename, read_only, keep_vba, File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py", line 123, in init self.archive = _validate_archive(fn) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py", line 95, in _validate_archive archive = ZipFile(filename, 'r') File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py", line 1269, in init self._RealGetContents() File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py", line 1336, in _RealGetContents raise BadZipFile("File is not a zip file") zipfile.BadZipFile: File is not a zip file

Another way is by using OleFile:

def compute(input_raw, output_df, ctx):

    def process_file(file_status):
        with input_raw.filesystem().open(file_status.path, 'rb') as f:
            ole = olefile.OleFileIO(f.read())
            if ole.exists('Workbook'):
                d = ole.openstream('Workbook')
                pdf = pd.read_excel(d, dtype=str, header=0, engine='openpyxl')

         

            for row in pdf.to_dict('records'):
                yield json.dumps(row)

    files_df = input_raw.filesystem().files(regex=r'.*ASGK.*\.csv$')
    rdd = files_df.rdd.flatMap(process_file)
    spark = ctx.spark_session
    dfs = spark.read.json(rdd)


    output_df.write_dataframe(dfs)

Traceback (most recent call last): File "/myproject/datasets/OleFile.py", line 33, in compute dfs = spark.read.json(rdd) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 241, in json return self._df(self._jreader.json(jrdd)) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/java_gateway.py", line 1322, in call return_value = get_return_value( File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o320.json. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (localhost executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process serializer.dump_stream(out_iter, outfile) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream for obj in iterator: File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 232, in func for x in iterator: File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/src/myproject/datasets/OleFile.py", line 18, in process_file ole = olefile.OleFileIO(f.read()) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/olefile/olefile.py", line 1075, in init self.open(filename, write_mode=write_mode) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/olefile/olefile.py", line 1169, in open self.fp = open(filename, mode) FileNotFoundError: [Errno 2] No such file or directory: b'\xef\xbb\xbfname,surname,country\r\nVil,Gru,Fr\r\nAnn,May,De\xc5\xbe\r\n

Any help please?

Upvotes: 0

Views: 145

Answers (1)

user5233494
user5233494

Reputation: 245

The first error looks like that library is trying to interpret the file as a zip file.

File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py", line 1336, in _RealGetContents raise BadZipFile("File is not a zip file") zipfile.BadZipFile: File is not a zip file

The second one, it looks like it is trying to use a line from your csv file as a path?

FileNotFoundError: [Errno 2] No such file or directory: b'\xef\xbb\xbfname,surname,country\r\nVil,Gru,Fr\r\nAnn,May,De\xc5\xbe\r\n

Your regex looks correct as far as I can see.

You can read CSV directly with Spark as described in this answer: https://stackoverflow.com/a/72312808/5233494

    filesystem = raw.filesystem()
    hadoop_path = filesystem.hadoop_path
    files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls(regex=r'.*ASGK.*\.csv$')]
    df = (
        ctx
        .spark_session
        .read
        .option("encoding", "UTF-8")  # UTF-8 is the default
        .option("header", True)
        .option("inferSchema", True)
        .csv(files)
    )

Upvotes: 3

Related Questions