Reputation: 146
I want ignore the paths that generate the Error:
'Path does not exist'
when I read parquet files with pyspark. For example I have a list of paths:
list_paths = ['path1','path2','path3']
and read the files like:
dataframe = spark.read.parquet(*list_paths)
but the path path2
does not exist. In general, I do not know which path does not exits, so I want ignore path2
automatically. How can I do it and obtain only one dataframe?
Upvotes: 2
Views: 4018
Reputation: 1895
Adding to @blackbishop's answer, you can further use Hadoop pattern strings to check for files/objects before loading them.
It's also worth noting that spark.read.load()
accepts lists of path strings.
from functools import partial
from typing import Iterator
from pyspark.sql import SparkSession
def iterhadoopfiles(spark: SparkSession, path_pattern: str) -> Iterator[str]:
"""Return iterator of object/file paths that match path_pattern."""
sc = spark.sparkContext
FileUtil = sc._gateway.jvm.org.apache.hadoop.fs.FileUtil
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
hadoop_config = sc._jsc.hadoopConfiguration()
p = Path(path_pattern)
return (
str(x)
for x in FileUtil.stat2Paths(
p.getFileSystem(hadoop_config).globStatus(p)
)
)
def pathnotempty(spark: SparkSession, path_pattern: str) -> bool:
"""Return true if path matches at least one object/file."""
try:
next(iterhadoopfiles(spark, path_pattern))
except StopIteration:
return False
return True
paths_to_load = list(filter(partial(pathnotempty, spark), ["file:///*.parquet"]))
spark.read.format('parquet').load(paths_to_load)
Upvotes: 1
Reputation: 32640
You can use Hadoop FS API to check if the files exist before you pass them to spark.read
:
conf = sc._jsc.hadoopConfiguration()
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
filtered_paths = [p for p in list_paths if Path(p).getFileSystem(conf).exists(Path(p))]
dataframe = spark.read.parquet(*filtered_paths)
Where sc
is the SparkContext.
Upvotes: 1
Reputation: 1824
Maybe you can do
existing_paths = [path for path in list_paths if os.path.exists(path)]
dataframe = spark.read.parquet(*existing_paths)
Upvotes: 0