Art
Art

Reputation: 1340

Looking for a way to continuously process files written to hdfs

I'm looking for a tool that can:

  1. monitor hdfs dir for new files and process them as they appear.
  2. It should also process files that were in the directory before the job/app started its work.
  3. it should have checkpoints to continue from where it left in case of restart.

I looked at apache spark: it can read newly added files and can handle restarts to continue from where it left. I couldn't find a way to make it also process old files in scope of the same job (so only 1 and 3).

I looked at apache flink: It does process old and new files. However, once job is restarted, it starts processing all of them again (1 and 2).

This is a use case that should be very common. Am I missing something in spark/flink that makes it possible? Is there another tool that can be used here?

Upvotes: 1

Views: 1051

Answers (3)

sandip
sandip

Reputation: 43

The best way is to maintain a state machine. Maintain a table or a file which contains all the files which have been processed.

The application on start reads the file list and maintains the same in set/map. Any new file/old file which has been processed can be looked and validated against the same.

Also the ingestion folder need to maintain some states of file. like files which have been processed are renamed with some ext. failed files being moved to failed folder, rejected to a rejected folder. etc

You can do all of these using spark/flink .. tech is not a bottle neck here

Upvotes: 1

RBanerjee
RBanerjee

Reputation: 947

I would recommend you modify the file ingestion a bit and incorporate Kafka, so that every time you put a new file in HDFS , put a message in Kafka queue. Then use Spark streaming to read file names from queue then files from hdfs and process.

Check-pointing is a real pain, also doesn't guarantee what you want. Kafka with spark will be able guarantee exactly once semantic.

Flume has a SpoolDirSource , you can have a look at it as well .

Upvotes: 2

Kostas Kloudas
Kostas Kloudas

Reputation: 121

With Flink streaming you can process files in a directory exactly as you suggested and when you restart it will start processing from where it left off. It is called Continuous File Processing.

The only thing you have to do is 1) enable checkpointing for your job and 2) start your program with:

    Time period = Time.minutes(10)
    env.readFile(inputFormat, "hdfs:// … /logs",
                 PROCESS_CONTINUOUSLY, 
                 period.toMilliseconds, 
                 FilePathFilter.createDefaultFilter())

The feature is rather new and there is an active discussion in the dev mailing list on how to further improve its functionality.

Hope this helps!

Upvotes: 5

Related Questions