Reputation: 363
I am trying to copy data from S3 to HDFS using ListS3, fetchS3object and PutHDFS. Data in S3 bucket is structured as follows, need to copy it to HDFS with same folder structure(folder names are dynamic).
bucketname/parent-folder1/subfolder1/filename1.txt
bucketname/parent-folder1/subfolder2/filename2.txt
bucketname/parent-folder2/subfolder1/filename3.txt
PutHDFS processor is showing following error
org.apache.nifi.processor.exception.ProcessException: Copied file to HDFS but could not rename dot file /dev/.parent-folder1/subfolder1/filename1.txt to its final filename
I understand that folders are virtual in S3. It works by introducing UpdateAttribute processor (${filename:replaceAll("/", "-")}) but the folder structred is not created in HDFS. What are other options ? Is there any template?
Some doubts on error handling
1)ListS3 processor maintains state. What happens when ListS3 and fetchS3object are successful and PutHDFS fails ? will ListS3 load file again or it is up to the developer to handle exception. Is is possible to reuse flowfile loaded by fetchS3object.
2)How does an end user know successful copy and failed copy
Thanks Tilak
Upvotes: 0
Views: 1363
Reputation: 18630
I think the issue is that the "filename" attribute of the flow files coming out of FetchS3Object is set to something like "parent-folder1/subfolder1/filename1.txt", but PutHDFS needs this value to be just "filename1.txt".
You could check this by stopping PutHDFS and waiting til a flow file is in the incoming queue, then list the queue and look at the attributes of the flow file and see what filename is equal to.
If what I suggested is true, then you could use an UpdateAttribute before PutHDFS to make filename = ${filename:substringAfterLast('/')}.
Then in PutHDFS make the Directory property "/dev/${path}" or whatever attribute has the path that was from the bucket.
To answer your specific questions...
1) ListS3 has no knowledge if things downstream from it work or not, so it will not retry or reset state.
2) You should know success or failure based on the relationships of PutHDFS. You should be routing the failure relationship somewhere so that it could be reprocessed or retried.
Upvotes: 2