Reputation: 1340
I'm looking for a tool that can:
I looked at apache spark: it can read newly added files and can handle restarts to continue from where it left. I couldn't find a way to make it also process old files in scope of the same job (so only 1 and 3).
I looked at apache flink: It does process old and new files. However, once job is restarted, it starts processing all of them again (1 and 2).
This is a use case that should be very common. Am I missing something in spark/flink that makes it possible? Is there another tool that can be used here?
Upvotes: 1
Views: 1051
Reputation: 43
The best way is to maintain a state machine. Maintain a table or a file which contains all the files which have been processed.
The application on start reads the file list and maintains the same in set/map. Any new file/old file which has been processed can be looked and validated against the same.
Also the ingestion folder need to maintain some states of file. like files which have been processed are renamed with some ext. failed files being moved to failed folder, rejected to a rejected folder. etc
You can do all of these using spark/flink .. tech is not a bottle neck here
Upvotes: 1
Reputation: 947
I would recommend you modify the file ingestion a bit and incorporate Kafka, so that every time you put a new file in HDFS , put a message in Kafka queue. Then use Spark streaming to read file names from queue then files from hdfs and process.
Check-pointing is a real pain, also doesn't guarantee what you want. Kafka with spark will be able guarantee exactly once semantic.
Flume has a SpoolDirSource , you can have a look at it as well .
Upvotes: 2
Reputation: 121
With Flink streaming you can process files in a directory exactly as you suggested and when you restart it will start processing from where it left off. It is called Continuous File Processing.
The only thing you have to do is 1) enable checkpointing for your job and 2) start your program with:
Time period = Time.minutes(10)
env.readFile(inputFormat, "hdfs:// … /logs",
PROCESS_CONTINUOUSLY,
period.toMilliseconds,
FilePathFilter.createDefaultFilter())
The feature is rather new and there is an active discussion in the dev mailing list on how to further improve its functionality.
Hope this helps!
Upvotes: 5