Reputation: 320
I am running following code on Spark shell
>`spark-shell
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._
scala> import org.apache.spark._
import org.apache.spark._
scala> object sparkClient{
| def main(args : Array[String])
| {
| val ssc = new StreamingContext(sc,Seconds(1))
| val Dstreaminput = ssc.textFileStream("hdfs:///POC/SPARK/DATA/*")
| val transformed = Dstreaminput.flatMap(word => word.split(" "))
| val mapped = transformed.map(word => if(word.contains("error"))(word,"defect")else(word,"non-defect"))
| mapped.print()
| ssc.start()
| ssc.awaitTermination()
| }
| }
defined object sparkClient
scala> sparkClient.main(null)
Output is blank as follows. No file is read and no streaming took place.
The path which I have given as input in the above code is as follows:
[hadoopadmin@master ~]$ hadoop fs -ls /POC/SPARK/DATA/
17/11/14 18:04:32 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r-- 2 hadoopadmin supergroup 17881 2017-09-21 11:02
/POC/SPARK/DATA/LICENSE
-rw-r--r-- 2 hadoopadmin supergroup 24645 2017-09-21 11:04
/POC/SPARK/DATA/NOTICE
-rw-r--r-- 2 hadoopadmin supergroup 845 2017-09-21 12:35
/POC/SPARK/DATA/confusion.txt
Could anyone please explain where I am going wrong? Or is there anything wrong with the syntax(although I did not encounter any error) as I am new to spark?
Upvotes: 1
Views: 578
Reputation: 4224
Everyone on the earth has a right to be happy, be it spark itself or a spark developer.
Spark streaming method of textFileStream() needs files to be modified after the streaming process is started. This means, spark steaming will not read existing files.
So, you may think you can copy the new files. But this is a problem because copy does not affect the Modified time of the file.
The last option, you may try to create new files on the fly. But that's tedious and should happen while the spark cycle is running.
I wrote a simple java program that would create the files on the fly. So everyone now is happy. :-)(You just need a commons-io lib on the classpath. just a single jar.)
import java.awt.Button;
import java.awt.FlowLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import javax.swing.JFrame;
import org.apache.commons.io.IOUtils;
public class CreateFileMain extends JFrame {
private static final long serialVersionUID = 1L;
Button b;
public CreateFileMain() {
b = new Button("Create New File");
b.addActionListener(new ActionListener() {
@Override
public void actionPerformed(ActionEvent e) {
String dir = "C:/Users/spratapw/workspace/batchload1/spark-streaming-poc/input/";
deleteExistingFiles(dir);
Random r = new Random();
File f = new File(dir+r.nextInt()+".txt");
createNewFile(f);
}
private void createNewFile(File f) {
try {
f.createNewFile();
List<String> lines = new ArrayList<>();
lines.add("Hello World");
FileOutputStream fos = new FileOutputStream(f);
IOUtils.writeLines(lines, "\n", fos, Charset.defaultCharset());
fos.close();
} catch (IOException e2) {
e2.printStackTrace();
}
}
private void deleteExistingFiles(String dir) {
File filetodelete = new File(dir);
File[] allContents = filetodelete.listFiles();
if (allContents != null) {
for (File file : allContents) {
file.delete();
}
}
}
});
this.add(b);
this.setLayout(new FlowLayout());
}
public static void main(String[] args) throws IOException {
CreateFileMain m = new CreateFileMain();
m.setVisible(true);
m.setSize(200, 200);
m.setLocationRelativeTo(null);
m.setDefaultCloseOperation(EXIT_ON_CLOSE);
}
}
Upvotes: 0
Reputation: 21
textFileStream
won't read pre-existing data. It will include only new files:
created in the dataDirectory by atomically moving or renaming them into the data directory.
https://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources
Upvotes: 2