Reputation: 73
I have the following code:
SparkConf sparkConf = new SparkConf().setAppName("My app")
.setMaster("local[4]")
.set("spark.executor.memory", "2g")
.set("spark.driver.allowMultipleContexts", "true");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
JavaDStream<String> trainingData = jssc.textFileStream("filesDirectory");
trainingData.print();
jssc.start();
jssc.awaitTermination();
Unfortunately, to stream any file exists in the directory I have to edit this file and rename it after starting stream context, otherwise it will not be processed.
Should I edit and rename each file to process it or there is another way to process the existing files by just edit and save them.
P.S. When I move new file to this directory, I need also to edit and rename this file to stream it!!!
Upvotes: 1
Views: 2343
Reputation: 544
Try touching file before moving to the destination directory. Below is what javadoc says.
path
is a new file for the batch of currentTime
. For it to becurrentTime
for which thisUpvotes: 4
Reputation: 2977
JavaStreamingContext.textFileStream
returns a FileInputDStream
, which is used to monitor a folder when the files in the folder are being added/updated regularly. You will get the notification after every two seconds, only when a new file is added/updated.
If your intent is just to read the file, you can rather use SparkContext.textFile.
Looking at the documentation from source code of JavaStreamingContext.textFileStream()
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). Files must be written to the
* monitored directory by "moving" them from another location within the same
* file system. File names starting with . are ignored.
*/
Upvotes: 1