Reputation: 322
I am trying to perform in-memory operations on files stored in azure datalake. I am unable to find documentation regarding using a matching pattern without using the ADL Downloader.
For a single file, this is the code I use
filename = '/<folder/<filename>.json'
with adlsFileSystemClient.open(filename) as f:
for line in f:
<file-operations>
But how do we filter based on filename (string matching) or based on last modified date.
When I used U-SQL , I had the option to filter the fileset based on the last modified option.
DECLARE EXTERNAL @TodaysTime = DateTime.UtcNow.AddDays(-1);
@rawInput=
EXTRACT jsonString string,
uri = FILE.URI()
,modified_date = FILE.MODIFIED()
FROM @in
USING Extractors.Tsv(quoting : true);
@parsedInput=
SELECT *
FROM @rawInput
WHERE modified_date > @TodaysTime;
Is there any similar options to filter the files modified during a specified period when using adlsFileSystemClient?
Github Issue: https://github.com/Azure/azure-data-lake-store-python/issues/300
Any help is appreciated.
Upvotes: 1
Views: 2924
Reputation: 2334
Based on the below code , You can find the container level directories and file names with file properties including the last_modified data as well . So you can control the file based on the last_modified date .
from pyspark.sql.functions import col
from azure.storage.blob import BlockBlobService
from datetime import datetime
block_blob_service = BlockBlobService(account_name='acccount_name', account_key='account-key')
container_name ='Contaniner_name'
second_conatainer_name ='Contaniner_name_second'
#block_blob_service.create_container(container_name)
generator = block_blob_service.list_blobs(container_name,prefix="Recovery/")
report_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
myfile = open('/dbfs/adlsaudit/auditfiles2', 'w')
for blob in generator:
length = BlockBlobService.get_blob_properties(block_blob_service,container_name,blob.name).properties.content_length
last_modified = BlockBlobService.get_blob_properties(block_blob_service,container_name,blob.name).properties.last_modified
file_size = BlockBlobService.get_blob_properties(block_blob_service,container_name,blob.name).properties.content_length
# print("\t Recovery: " + blob.name,":" +str(length),":" + str(last_modified))
line = container_name+'|'+second_conatainer_name+'|'+blob.name+'|'+ str(file_size) +'|'+str(last_modified)+'|'+str(report_time)
myfile.write(line+'\n')
myfile.close()
Upvotes: 0
Reputation: 322
Note:
This question was answered by akharit in GitHub recently. I am providing his answer below which solves my requirement.
**There isn't any in build functionality in the adls sdk itself as there is no server side api that will return only files modified with the last 4 hours. It should be easy to write the code to do that after you get the list of all entries. The modification time field returns milliseconds since unix epoch, which you can convert to a python datetime object by
from datetime import datetime, timedelta
datetime.fromtimestamp(file['modificationTime'] / 1000)
And then something like
filtered = [file['name'] for file in adl.ls('/', detail=True) if (datetime.now() - datetime.fromtimestamp(file['modificationTime']/1000)) > timedelta(hours = 4)]
You can use walk instead of ls for recursive enumeration as well.
**
Upvotes: 1