Interested_Programmer
Interested_Programmer

Reputation: 322

Can azure data lake files be filtered based on Last Modified time using azure python sdk?

I am trying to perform in-memory operations on files stored in azure datalake. I am unable to find documentation regarding using a matching pattern without using the ADL Downloader.

For a single file, this is the code I use

filename = '/<folder/<filename>.json'
with adlsFileSystemClient.open(filename) as f:
    for line in f:
         <file-operations>

But how do we filter based on filename (string matching) or based on last modified date.

When I used U-SQL , I had the option to filter the fileset based on the last modified option.

DECLARE EXTERNAL @TodaysTime = DateTime.UtcNow.AddDays(-1);

@rawInput=
    EXTRACT jsonString string,
            uri = FILE.URI()
            ,modified_date = FILE.MODIFIED()
    FROM @in
    USING Extractors.Tsv(quoting : true);



@parsedInput=
    SELECT *
    FROM @rawInput
    WHERE modified_date > @TodaysTime;

Is there any similar options to filter the files modified during a specified period when using adlsFileSystemClient?

Github Issue: https://github.com/Azure/azure-data-lake-store-python/issues/300

Any help is appreciated.

Upvotes: 1

Views: 2924

Answers (2)

Based on the below code , You can find the container level directories and file names with file properties including the last_modified data as well . So you can control the file based on the last_modified date .

from pyspark.sql.functions import col
from azure.storage.blob import BlockBlobService
from datetime import datetime
block_blob_service = BlockBlobService(account_name='acccount_name', account_key='account-key')
container_name ='Contaniner_name'
second_conatainer_name ='Contaniner_name_second'
#block_blob_service.create_container(container_name)
generator = block_blob_service.list_blobs(container_name,prefix="Recovery/")
report_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

myfile = open('/dbfs/adlsaudit/auditfiles2', 'w')
for blob in generator:
    length = BlockBlobService.get_blob_properties(block_blob_service,container_name,blob.name).properties.content_length
    last_modified = BlockBlobService.get_blob_properties(block_blob_service,container_name,blob.name).properties.last_modified
    file_size = BlockBlobService.get_blob_properties(block_blob_service,container_name,blob.name).properties.content_length
  #  print("\t Recovery: " + blob.name,":" +str(length),":" + str(last_modified))
    line = container_name+'|'+second_conatainer_name+'|'+blob.name+'|'+ str(file_size) +'|'+str(last_modified)+'|'+str(report_time)
    myfile.write(line+'\n')
myfile.close()

Upvotes: 0

Interested_Programmer
Interested_Programmer

Reputation: 322

Note:

This question was answered by akharit in GitHub recently. I am providing his answer below which solves my requirement.

**There isn't any in build functionality in the adls sdk itself as there is no server side api that will return only files modified with the last 4 hours. It should be easy to write the code to do that after you get the list of all entries. The modification time field returns milliseconds since unix epoch, which you can convert to a python datetime object by

from datetime import datetime, timedelta
 datetime.fromtimestamp(file['modificationTime'] / 1000)

And then something like

    filtered = [file['name'] for file in adl.ls('/', detail=True) if (datetime.now() - datetime.fromtimestamp(file['modificationTime']/1000)) > timedelta(hours = 4)]

You can use walk instead of ls for recursive enumeration as well.

**

Upvotes: 1

Related Questions