Nicolas Parra Avila
Nicolas Parra Avila

Reputation: 146

Ignore path does not exist in pyspark

I want ignore the paths that generate the Error:

'Path does not exist'

when I read parquet files with pyspark. For example I have a list of paths:

list_paths = ['path1','path2','path3']

and read the files like:

dataframe = spark.read.parquet(*list_paths)

but the path path2 does not exist. In general, I do not know which path does not exits, so I want ignore path2 automatically. How can I do it and obtain only one dataframe?

Upvotes: 2

Views: 4018

Answers (3)

aparkerlue
aparkerlue

Reputation: 1895

Adding to @blackbishop's answer, you can further use Hadoop pattern strings to check for files/objects before loading them.

It's also worth noting that spark.read.load() accepts lists of path strings.

from functools import partial
from typing import Iterator
from pyspark.sql import SparkSession


def iterhadoopfiles(spark: SparkSession, path_pattern: str) -> Iterator[str]:
    """Return iterator of object/file paths that match path_pattern."""
    sc = spark.sparkContext
    FileUtil = sc._gateway.jvm.org.apache.hadoop.fs.FileUtil
    Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
    hadoop_config = sc._jsc.hadoopConfiguration()
    p = Path(path_pattern)
    return (
        str(x)
        for x in FileUtil.stat2Paths(
            p.getFileSystem(hadoop_config).globStatus(p)
        )
    )


def pathnotempty(spark: SparkSession, path_pattern: str) -> bool:
    """Return true if path matches at least one object/file."""
    try:
        next(iterhadoopfiles(spark, path_pattern))
    except StopIteration:
        return False
    return True


paths_to_load = list(filter(partial(pathnotempty, spark), ["file:///*.parquet"]))
spark.read.format('parquet').load(paths_to_load)

Upvotes: 1

blackbishop
blackbishop

Reputation: 32640

You can use Hadoop FS API to check if the files exist before you pass them to spark.read:

conf = sc._jsc.hadoopConfiguration()
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path


filtered_paths = [p for p in list_paths if Path(p).getFileSystem(conf).exists(Path(p))]

dataframe = spark.read.parquet(*filtered_paths)

Where sc is the SparkContext.

Upvotes: 1

Vladimir Vargas
Vladimir Vargas

Reputation: 1824

Maybe you can do

existing_paths = [path for path in list_paths if os.path.exists(path)]
dataframe = spark.read.parquet(*existing_paths)

Upvotes: 0

Related Questions