himanshuIIITian
himanshuIIITian

Reputation: 6085

Read all Parquet files saved in a folder via Spark

I have a folder containing Parquet files. Something like this:

scala> val df = sc.parallelize(List(1,2,3,4)).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]

scala> df.write.parquet("/tmp/test/df/1.parquet")

scala> val df = sc.parallelize(List(5,6,7,8)).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]

scala> df.write.parquet("/tmp/test/df/2.parquet")

After saving dataframes when I go to read all parquet files in df folder, it gives me error.

scala> val read = spark.read.parquet("/tmp/test/df")
org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:188)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:425)
  ... 48 elided

I know I can read Parquet files by giving full path, but it would be better if there is a way to read all parquet files in a folder.

Upvotes: 22

Views: 71392

Answers (3)

Ihor Konovalenko
Ihor Konovalenko

Reputation: 1407

You can write data into folder not as separate Spark "files" (in fact folders) 1.parquet, 2.parquet etc. If don't set file name but only path, Spark will put files into the folder as real files (not folders), and automatically name that files.

df1.write.partitionBy("countryCode").format("parquet").mode("overwrite").save("/tmp/data1/")
df2.write.partitionBy("countryCode").format("parquet").mode("append").save("/tmp/data1/")
df3.write.partitionBy("countryCode").format("parquet").mode("append").save("/tmp/data1/")

Further we can read data from all files in data folder:

val df = spark.read.format("parquet").load("/tmp/data1/")

Upvotes: 1

eliasah
eliasah

Reputation: 40360

Spark doesn't write/read parquet the way you think it does.

It uses the Hadoop library to write/read partitioned parquet file.

Thus your first parquet file is under the path /tmp/test/df/1.parquet/ where 1.parquet is a directory. This means that when reading from parquet you would need to provide the path to your parquet directory or path if it's one file.

val df = spark.read.parquet("/tmp/test/df/1.parquet/")

I advice you to read the official documentation for more details. [cf. SQL Programming Guide - Parquet Files]

EDIT:

You must be looking for something like this :

scala> sqlContext.range(1,100).write.save("/tmp/test/df/1.parquet")

scala> sqlContext.range(100,500).write.save("/tmp/test/df/2.parquet")

scala> val df = sqlContext.read.load("/tmp/test/df/*")
// df: org.apache.spark.sql.DataFrame = [id: bigint]

scala> df.show(3)
// +---+
// | id|
// +---+
// |400|
// |401|
// |402|
// +---+
// only showing top 3 rows

scala> df.count
// res3: Long = 499

You can also use wildcards in your file paths URI.

And you can provide multiple files paths as followed :

scala> val df2 = sqlContext.read.load("/tmp/test/df/1.parquet","/tmp/test/df/2.parquet")
// df2: org.apache.spark.sql.DataFrame = [id: bigint]

scala> df2.count
// res5: Long = 499

Upvotes: 24

koiralo
koiralo

Reputation: 23099

The file you wrote on /tmp/test/df/1.parquet and /tmp/test/df/2.parquet are not a output file they are output Directory. so, you can read the parquet is

val data = spark.read.parquet("/tmp/test/df/1.parquet/")

Upvotes: 1

Related Questions