Reputation: 29
I've got a list of plenty of .csv files in a dataset, but for the simplicity let's assume there are three files in the dataset:
I need to process only files with 'ASGK' in their titles. In other words, I need to filter files by their titles using transforms.api.FileSystem.files method. All files share same column names.
I filter files using regex code. Here's two pieces of code I've been using, however, unsuccessfully.
from pyspark.sql import functions as F
from transforms.api import transform, Input, Output
import pandas as pd
import json
import re
@transform(
output_df=Output(
""),
input_raw=Input(""),
)
def compute(input_raw, output_df, ctx):
def process_file(file_status):
with input_raw.filesystem().open(file_status.path, 'rb') as f:
with pd.ExcelFile(f.read(), engine='openpyxl') as xlsx_path:
pdf = pd.read_csv(xlsx_path, dtype=str, header=0)
pdf.columns = pdf.columns.str.lower()
for row in pdf.to_dict('records'):
yield json.dumps(row, default=str)
rdd = input_raw.filesystem().files(regex=r'.*ASGK.*\.csv$').rdd.flatMap(process_file)
spark = ctx.spark_session
dfs = spark.read.json(rdd)
output_df.write_dataframe(dfs)
The error I get:
Traceback (most recent call last): File "/myproject/datasets/ExcelFile.py", line 27, in compute dfs = spark.read.json(rdd) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 241, in json return self._df(self._jreader.json(jrdd)) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/java_gateway.py", line 1322, in call return_value = get_return_value( File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o163.json. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (localhost executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process serializer.dump_stream(out_iter, outfile) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream for obj in iterator: File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 232, in func for x in iterator: File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/src/myproject/datasets/ExcelFile.py", line 17, in process_file with pd.ExcelFile(f.read(), engine='openpyxl') as xlsx_path: File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_base.py", line 1695, in init self._reader = self._engines[engine](self._io, storage_options=storage_options) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_openpyxl.py", line 557, in init super().init(filepath_or_buffer, storage_options=storage_options) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_base.py", line 545, in init self.book = self.load_workbook(self.handles.handle) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/pandas/io/excel/_openpyxl.py", line 568, in load_workbook return load_workbook( File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py", line 344, in load_workbook reader = ExcelReader(filename, read_only, keep_vba, File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py", line 123, in init self.archive = _validate_archive(fn) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/openpyxl/reader/excel.py", line 95, in _validate_archive archive = ZipFile(filename, 'r') File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py", line 1269, in init self._RealGetContents() File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py", line 1336, in _RealGetContents raise BadZipFile("File is not a zip file") zipfile.BadZipFile: File is not a zip file
Another way is by using OleFile:
def compute(input_raw, output_df, ctx):
def process_file(file_status):
with input_raw.filesystem().open(file_status.path, 'rb') as f:
ole = olefile.OleFileIO(f.read())
if ole.exists('Workbook'):
d = ole.openstream('Workbook')
pdf = pd.read_excel(d, dtype=str, header=0, engine='openpyxl')
for row in pdf.to_dict('records'):
yield json.dumps(row)
files_df = input_raw.filesystem().files(regex=r'.*ASGK.*\.csv$')
rdd = files_df.rdd.flatMap(process_file)
spark = ctx.spark_session
dfs = spark.read.json(rdd)
output_df.write_dataframe(dfs)
Traceback (most recent call last): File "/myproject/datasets/OleFile.py", line 33, in compute dfs = spark.read.json(rdd) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 241, in json return self._df(self._jreader.json(jrdd)) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/java_gateway.py", line 1322, in call return_value = get_return_value( File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o320.json. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (localhost executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process serializer.dump_stream(out_iter, outfile) File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 132, in dump_stream for obj in iterator: File "/scratch/asset-install/eaa90edc26b48342e7e38b6e8eac36f7/miniconda38/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 232, in func for x in iterator: File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/src/myproject/datasets/OleFile.py", line 18, in process_file ole = olefile.OleFileIO(f.read()) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/olefile/olefile.py", line 1075, in init self.open(filename, write_mode=write_mode) File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/site-packages/olefile/olefile.py", line 1169, in open self.fp = open(filename, mode) FileNotFoundError: [Errno 2] No such file or directory: b'\xef\xbb\xbfname,surname,country\r\nVil,Gru,Fr\r\nAnn,May,De\xc5\xbe\r\n
Any help please?
Upvotes: 0
Views: 145
Reputation: 245
The first error looks like that library is trying to interpret the file as a zip file.
File "/scratch/standalone/51a264ef-c89c-4b36-a24f-a77c1c02a38b/code-assist/contents/transforms-python/build/conda/run-env/lib/python3.8/zipfile.py", line 1336, in _RealGetContents raise BadZipFile("File is not a zip file") zipfile.BadZipFile: File is not a zip file
The second one, it looks like it is trying to use a line from your csv file as a path?
FileNotFoundError: [Errno 2] No such file or directory: b'\xef\xbb\xbfname,surname,country\r\nVil,Gru,Fr\r\nAnn,May,De\xc5\xbe\r\n
Your regex looks correct as far as I can see.
You can read CSV directly with Spark as described in this answer: https://stackoverflow.com/a/72312808/5233494
filesystem = raw.filesystem()
hadoop_path = filesystem.hadoop_path
files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls(regex=r'.*ASGK.*\.csv$')]
df = (
ctx
.spark_session
.read
.option("encoding", "UTF-8") # UTF-8 is the default
.option("header", True)
.option("inferSchema", True)
.csv(files)
)
Upvotes: 3