lbz
lbz

Reputation: 81

Specifying "basePath" option in Spark Structured Streaming

Is it possible to set the basePath option when reading partitioned data in Spark Structured Streaming (in Java)? I want to load only the data in a specific partition, such as basepath/x=1/, but I also want x to be loaded as a column. Setting basePath the way I would for a non-streaming dataframe doesn't seem to work.

Here's a minimal example. I have a dataframe containing the following data:

+---+---+
|  a|  b|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

I wrote this as a Parquet file to a subdirectory named x=1.

The following code (with a regular non-streaming dataframe) works fine:

Dataset<Row> data = sparkSession.read()
  .option("basePath", basePath)
  .parquet(basePath + "/x=1");

data.show();

This produces the expected result:

+---+---+---+
|  a|  b|  x|
+---+---+---+
|  1|  2|  1|
|  3|  4|  1|
+---+---+---+

However, the following (using the Structured Streaming API) doesn't work:

StructType schema = data.schema(); // data as defined above

Dataset<Row> streamingData = sparkSession.readStream()
  .schema(schema)
  .option("basePath", basePath)
  .parquet(basePath + "/x=1");

streamingData.writeStream()
  .trigger(Trigger.Once())
  .format("console")
  .start().awaitTermination();

The dataframe, in this case, doesn't contain any rows:

+---+---+---+
|  a|  b|  x|
+---+---+---+
+---+---+---+

Upvotes: 8

Views: 20560

Answers (1)

Med Zamrik
Med Zamrik

Reputation: 323

I'm not sure if this will work for spark streaming, but it work for my batch processing in Scala. What I would do is I would avoid using basePath entirely. For example, when my data is partitioned over year/month/day, and I want to loop and process per day, I would use string interpolation.

import java.text.SimpleDateFormat
import java.sql.Timestamp
import java.util.Calendar

var dateStart: String = "01/14/2012"
var dateStop: String = "01/18/2012"

var  format: SimpleDateFormat = new SimpleDateFormat("MM/dd/yyyy");


var d1 = new Timestamp(format.parse(dateStart).getTime());
var d2 = new Timestamp(format.parse(dateStop).getTime());

var diffDays:Long = (d2.getTime() - d1.getTime()) / (24 * 60 * 60 * 1000)

var cal:Calendar = Calendar.getInstance()
cal.setTimeInMillis(d1.getTime())
for (i <- 0 to diffDays.toInt){
    val year = cal.get(Calendar.YEAR)
    val month = cal.get(Calendar.MONTH)
    val day = cal.get(Calendar.DAY_OF_MONTH)
    var dataframe1 = spark.read
           .load(s"s3://bucketName/somepath/year=$year/month=$month/day=$day")
    /*
    Do your dataframe manipulation here
    */
    cal.add(Calendar.DAY_OF_YEAR, 1)
}

You can do this with a list of strings or integers as well. If you need to see that data as a column, you can always append it as a new column to the dataframe. I'm not sure if this would work for your case with Spark streaming though.

Upvotes: 1

Related Questions