Eyad
Eyad

Reputation: 73

textFileStream in Spark

I have the following code:

SparkConf sparkConf = new SparkConf().setAppName("My app")
        .setMaster("local[4]")
        .set("spark.executor.memory", "2g")
        .set("spark.driver.allowMultipleContexts", "true");

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

JavaDStream<String> trainingData = jssc.textFileStream("filesDirectory");

trainingData.print();

jssc.start();
jssc.awaitTermination();

Unfortunately, to stream any file exists in the directory I have to edit this file and rename it after starting stream context, otherwise it will not be processed.

Should I edit and rename each file to process it or there is another way to process the existing files by just edit and save them.

P.S. When I move new file to this directory, I need also to edit and rename this file to stream it!!!

Upvotes: 1

Views: 2343

Answers (2)

Pankaj Arora
Pankaj Arora

Reputation: 544

Try touching file before moving to the destination directory. Below is what javadoc says.

  • Identify whether the given path is a new file for the batch of currentTime. For it to be
  • accepted, it has to pass the following criteria.
    • It must pass the user-provided file filter.
    • It must be newer than the ignore threshold. It is assumed that files older than the ignore
  • threshold have already been considered or are existing files before start
  • (when newFileOnly = true).
    • It must not be present in the recently selected files that this class remembers.
    • It must not be newer than the time of the batch (i.e. currentTime for which this
  • file is being tested. This can occur if the driver was recovered, and the missing batches
  • (during downtime) are being generated. In that case, a batch of time T may be generated
  • at time T+x. Say x = 5. If that batch T contains file of mod time T+5, then bad things can
  • happen. Let's say the selected files are remembered for 60 seconds. At time t+61,
  • the batch of time t is forgotten, and the ignore threshold is still T+1.
  • The files with mod time T+5 are not remembered and cannot be ignored (since, t+5 > t+1).
  • Hence they can get selected as new files again. To prevent this, files whose mod time is more
  • than current batch time are not considered. *

Upvotes: 4

Mohitt
Mohitt

Reputation: 2977

JavaStreamingContext.textFileStream returns a FileInputDStream, which is used to monitor a folder when the files in the folder are being added/updated regularly. You will get the notification after every two seconds, only when a new file is added/updated.

If your intent is just to read the file, you can rather use SparkContext.textFile.

Looking at the documentation from source code of JavaStreamingContext.textFileStream()

/**
   * Create a input stream that monitors a Hadoop-compatible filesystem
   * for new files and reads them as text files (using key as LongWritable, value
   * as Text and input format as TextInputFormat). Files must be written to the
   * monitored directory by "moving" them from another location within the same
   * file system. File names starting with . are ignored.
   */

Upvotes: 1

Related Questions