Amey P
Amey P

Reputation: 57

Read parquet files created in last 2 hours in Pypark

I want to read multiple parquet files in Azure Blob Storage created in the last 2 hours.

I can read multiple files with the below command

df = sqlContext.read.parquet("/path/*.parquet")

This query is returning the result from all the parquet files present in that folder, now I want to get the data only from the parquet files created in the last 2 hours.

Please help me in getting the command right.

Upvotes: 2

Views: 1014

Answers (2)

blackbishop
blackbishop

Reputation: 32700

One way is using Hadoop FS API listStatus method to list all the files under that folder, select those that were modified in the last two hours using getModificationTime and pass the filtered list of files to Spark DataFrame reader.

In Pyspark you can access hadoop fs through the JVM gateway like this :

import datetime


Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
conf = sc._jsc.hadoopConfiguration()

data_path = "/path/*.parquet"

fs = Path(data_path).getFileSystem(conf)
file_status = fs.listStatus(Path(data_path))

last_2hours_time = (datetime.datetime.now() - datetime.timedelta(hours=2)).timestamp() * 1000

created_last_2hours = [
  f.getPath().toString() for f in file_status if f.getModificationTime() >= last_2hours_time
]

df = spark.read.parquet(*created_last_2hours)

You may also want to take a look at Python package for Azure Blob Storage to list the files.

Upvotes: 3

Boris
Boris

Reputation: 501

The standard way of addressing is to use spark Structured Streaming with checkpointing.

Run it first over the entire data & then with the same checkpoint run your specific business logic every 2 hours:

  • The first run will populate the checkpoint data, so that you won't go over the same files during the second run.
  • You can make the first run with empty logic, such as doing nothing in the my_function() example below.
spark.readStream.parquet("/path/*.parquet")\
    .writeStream \
    .trigger(once=True) \
    .outputMode('append') \
    .format("delta") \
    .option("checkpointLocation", checkpoint_path) \
    .start(path) \
    .awaitTermination()

OR something like

def my_function(df, epochId): 
    # do anything here


spark.readStream.parquet("/path/*.parquet") \
    .writeStream \
    .trigger(once=True) \
    .foreachBatch(my_function) \
    .option('checkpointLocation', 'some_location') \
    .start() \
    .awaitTermination()

See other examples

Upvotes: 0

Related Questions