Reputation: 389
I am new to flink (java)and trying to move xml files on a netapp file server mounted as a file path onto server that flink is installed.
How to do batch or stream processing in real time to fetch files coming to the folder and sink it with s3.
I couldn't find any examples in flink-starter to read files from local file system, is flink atleast a right choice to this use case? If so where can I find resources to listen to folder and manage checkpoints/ save points?
Upvotes: 0
Views: 230
Reputation: 389
Full working code for this question is in the following link . You need to enable checkpointing to move the .inprogress files to actual files
// start a checkpoint every 1000 ms env.enableCheckpointing(1000);
StreamingFileSink not ingesting data to s3
Upvotes: 0
Reputation: 43499
If your goal is simply to copy files to s3, there are simpler and more appropriate tools for that. Perhaps sync is suitable.
Assuming it makes sense to use Flink (e.g., because you want to perform some stateful transformation on the data), it will need to be the case that all of your task managers (workers) can access the files to be processed using the same URI. You can use a file:// URI for this purpose.
You can do something like this to monitor a directory and ingest new files as they appear:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// monitor directory, checking for new files
// every 100 milliseconds
TextInputFormat format = new TextInputFormat(
new org.apache.flink.core.fs.Path("file:///tmp/dir/"));
DataStream<String> inputStream = env.readFile(
format,
"file:///tmp/dir/",
FileProcessingMode.PROCESS_CONTINUOUSLY,
100,
FilePathFilter.createDefaultFilter());
Note this warning from the documentation:
If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.
This means you should atomically move files that are ready to be ingested into the folder being watched.
You can use the Streaming File Sink to write to S3. Flink's write operations, such as writeUsingOutputFormat()
, do not participate in checkpointing, so that's not a good choice in this case.
Upvotes: 1