Reputation: 81
Is it possible to set the basePath
option when reading partitioned data in Spark Structured Streaming (in Java)? I want to load only the data in a specific partition, such as basepath/x=1/
, but I also want x
to be loaded as a column. Setting basePath
the way I would for a non-streaming dataframe doesn't seem to work.
Here's a minimal example. I have a dataframe containing the following data:
+---+---+
| a| b|
+---+---+
| 1| 2|
| 3| 4|
+---+---+
I wrote this as a Parquet file to a subdirectory named x=1
.
The following code (with a regular non-streaming dataframe) works fine:
Dataset<Row> data = sparkSession.read()
.option("basePath", basePath)
.parquet(basePath + "/x=1");
data.show();
This produces the expected result:
+---+---+---+
| a| b| x|
+---+---+---+
| 1| 2| 1|
| 3| 4| 1|
+---+---+---+
However, the following (using the Structured Streaming API) doesn't work:
StructType schema = data.schema(); // data as defined above
Dataset<Row> streamingData = sparkSession.readStream()
.schema(schema)
.option("basePath", basePath)
.parquet(basePath + "/x=1");
streamingData.writeStream()
.trigger(Trigger.Once())
.format("console")
.start().awaitTermination();
The dataframe, in this case, doesn't contain any rows:
+---+---+---+
| a| b| x|
+---+---+---+
+---+---+---+
Upvotes: 8
Views: 20560
Reputation: 323
I'm not sure if this will work for spark streaming, but it work for my batch processing in Scala. What I would do is I would avoid using basePath
entirely. For example, when my data is partitioned over year/month/day, and I want to loop and process per day, I would use string interpolation.
import java.text.SimpleDateFormat
import java.sql.Timestamp
import java.util.Calendar
var dateStart: String = "01/14/2012"
var dateStop: String = "01/18/2012"
var format: SimpleDateFormat = new SimpleDateFormat("MM/dd/yyyy");
var d1 = new Timestamp(format.parse(dateStart).getTime());
var d2 = new Timestamp(format.parse(dateStop).getTime());
var diffDays:Long = (d2.getTime() - d1.getTime()) / (24 * 60 * 60 * 1000)
var cal:Calendar = Calendar.getInstance()
cal.setTimeInMillis(d1.getTime())
for (i <- 0 to diffDays.toInt){
val year = cal.get(Calendar.YEAR)
val month = cal.get(Calendar.MONTH)
val day = cal.get(Calendar.DAY_OF_MONTH)
var dataframe1 = spark.read
.load(s"s3://bucketName/somepath/year=$year/month=$month/day=$day")
/*
Do your dataframe manipulation here
*/
cal.add(Calendar.DAY_OF_YEAR, 1)
}
You can do this with a list of strings or integers as well. If you need to see that data as a column, you can always append it as a new column to the dataframe. I'm not sure if this would work for your case with Spark streaming though.
Upvotes: 1