John
John

Reputation: 1167

PySpark - read recursive Hive table

I have a Hive table that has multiple sub-directories in HDFS, something like:

/hdfs_dir/my_table_dir/my_table_sub_dir1
/hdfs_dir/my_table_dir/my_table_sub_dir2
...

Normally I set the following parameters before I run a Hive script:

set hive.input.dir.recursive=true;
set hive.mapred.supports.subdirectories=true;
set hive.supports.subdirectories=true;
set mapred.input.dir.recursive=true;

select * from my_db.my_table;

I'm trying to do the same using PySpark,

conf = (SparkConf().setAppName("My App")
        ...
        .set("hive.input.dir.recursive", "true")
        .set("hive.mapred.supports.subdirectories", "true")
        .set("hive.supports.subdirectories", "true")
        .set("mapred.input.dir.recursive", "true"))

sc = SparkContext(conf = conf)

sqlContext = HiveContext(sc)

my_table = sqlContext.sql("select * from my_db.my_table")

and end up with an error like:

java.io.IOException: Not a file: hdfs://hdfs_dir/my_table_dir/my_table_sub_dir1

What's the correct way to read a Hive table with sub-directories in Spark?

Upvotes: 2

Views: 4599

Answers (3)

roderrock89
roderrock89

Reputation: 11

Try setting them through SpakSession to execute the query:

sparkSession = (SparkSession
                        .builder
                        .appName('USS - Unified Scheme of Sells')
                        .config("hive.metastore.uris", "thrift://probighhwm001:9083", conf=SparkConf())
                        .config("hive.input.dir.recursive", "true")
                        .config("hive.mapred.supports.subdirectories", "true")
                        .config("hive.supports.subdirectories", "true")
                        .config("mapred.input.dir.recursive", "true")
                        .enableHiveSupport()
                        .getOrCreate()
                        )

Upvotes: 0

Paul
Paul

Reputation: 7355

What I have found is that these values must be preceded with spark as in:

.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")

Upvotes: 1

user3124185
user3124185

Reputation: 11

Try setting them through ctx.sql() prior to execute the query:

sqlContext.sql("SET hive.mapred.supports.subdirectories=true")
sqlContext.sql("SET mapreduce.input.fileinputformat.input.dir.recursive=true")
my_table = sqlContext.sql("select * from my_db.my_table")

Upvotes: 0

Related Questions