Reputation: 57
I want to read multiple parquet files in Azure Blob Storage created in the last 2 hours.
I can read multiple files with the below command
df = sqlContext.read.parquet("/path/*.parquet")
This query is returning the result from all the parquet files present in that folder, now I want to get the data only from the parquet files created in the last 2 hours.
Please help me in getting the command right.
Upvotes: 2
Views: 1014
Reputation: 32700
One way is using Hadoop FS API listStatus
method to list all the files under that folder, select those that were modified in the last two hours using getModificationTime
and pass the filtered list of files to Spark DataFrame reader.
In Pyspark you can access hadoop fs through the JVM gateway like this :
import datetime
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
conf = sc._jsc.hadoopConfiguration()
data_path = "/path/*.parquet"
fs = Path(data_path).getFileSystem(conf)
file_status = fs.listStatus(Path(data_path))
last_2hours_time = (datetime.datetime.now() - datetime.timedelta(hours=2)).timestamp() * 1000
created_last_2hours = [
f.getPath().toString() for f in file_status if f.getModificationTime() >= last_2hours_time
]
df = spark.read.parquet(*created_last_2hours)
You may also want to take a look at Python package for Azure Blob Storage to list the files.
Upvotes: 3
Reputation: 501
The standard way of addressing is to use spark Structured Streaming with checkpointing.
Run it first over the entire data & then with the same checkpoint run your specific business logic every 2 hours:
my_function()
example below.spark.readStream.parquet("/path/*.parquet")\
.writeStream \
.trigger(once=True) \
.outputMode('append') \
.format("delta") \
.option("checkpointLocation", checkpoint_path) \
.start(path) \
.awaitTermination()
OR something like
def my_function(df, epochId):
# do anything here
spark.readStream.parquet("/path/*.parquet") \
.writeStream \
.trigger(once=True) \
.foreachBatch(my_function) \
.option('checkpointLocation', 'some_location') \
.start() \
.awaitTermination()
See other examples
Upvotes: 0