Anon
Anon

Reputation: 320

Unable to get any data when spark streaming program in run taking source as textFileStream

I am running following code on Spark shell

 >`spark-shell
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> import org.apache.spark._
import org.apache.spark._

scala> object sparkClient{
 | def main(args : Array[String])
 | {
 | val ssc = new StreamingContext(sc,Seconds(1))
 | val Dstreaminput = ssc.textFileStream("hdfs:///POC/SPARK/DATA/*")
 | val transformed = Dstreaminput.flatMap(word => word.split(" "))
 | val mapped = transformed.map(word => if(word.contains("error"))(word,"defect")else(word,"non-defect"))
 | mapped.print()
 | ssc.start()
 | ssc.awaitTermination()
 | }
 | }
defined object sparkClient

scala> sparkClient.main(null)

Output is blank as follows. No file is read and no streaming took place.


Time: 1510663547000 ms


Time: 1510663548000 ms


Time: 1510663549000 ms


Time: 1510663550000 ms


Time: 1510663551000 ms


Time: 1510663552000 ms


Time: 1510663553000 ms


Time: 1510663554000 ms


Time: 1510663555000 ms


The path which I have given as input in the above code is as follows:

[hadoopadmin@master ~]$ hadoop fs -ls /POC/SPARK/DATA/
17/11/14 18:04:32 WARN util.NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Found 3 items
-rw-r--r--   2 hadoopadmin supergroup      17881 2017-09-21 11:02 
 /POC/SPARK/DATA/LICENSE
-rw-r--r--   2 hadoopadmin supergroup      24645 2017-09-21 11:04 
 /POC/SPARK/DATA/NOTICE
 -rw-r--r--   2 hadoopadmin supergroup        845 2017-09-21 12:35 
 /POC/SPARK/DATA/confusion.txt

Could anyone please explain where I am going wrong? Or is there anything wrong with the syntax(although I did not encounter any error) as I am new to spark?

Upvotes: 1

Views: 578

Answers (2)

Shailesh Pratapwar
Shailesh Pratapwar

Reputation: 4224

Everyone on the earth has a right to be happy, be it spark itself or a spark developer.

Spark streaming method of textFileStream() needs files to be modified after the streaming process is started. This means, spark steaming will not read existing files.

So, you may think you can copy the new files. But this is a problem because copy does not affect the Modified time of the file.

The last option, you may try to create new files on the fly. But that's tedious and should happen while the spark cycle is running.

I wrote a simple java program that would create the files on the fly. So everyone now is happy. :-)(You just need a commons-io lib on the classpath. just a single jar.)

import java.awt.Button;
import java.awt.FlowLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import javax.swing.JFrame;

import org.apache.commons.io.IOUtils;

public class CreateFileMain extends JFrame {

    private static final long serialVersionUID = 1L;
    
    Button b;
    public CreateFileMain() {
        b = new Button("Create New File");
        
        b.addActionListener(new ActionListener() {
            
            @Override
            public void actionPerformed(ActionEvent e) {
                String dir = "C:/Users/spratapw/workspace/batchload1/spark-streaming-poc/input/";
                deleteExistingFiles(dir);
                
                Random r = new Random();
                File f = new File(dir+r.nextInt()+".txt");
                createNewFile(f);
            }

            private void createNewFile(File f) {
                try {
                    f.createNewFile();
                    List<String> lines = new ArrayList<>();
                    lines.add("Hello World");
                    FileOutputStream fos = new FileOutputStream(f);
                    IOUtils.writeLines(lines, "\n", fos, Charset.defaultCharset());
                    fos.close();
                    
                } catch (IOException e2) {
                    e2.printStackTrace();
                }
            }

            private void deleteExistingFiles(String dir) {
                File filetodelete = new File(dir);
                File[] allContents = filetodelete.listFiles();
                if (allContents != null) {
                    for (File file : allContents) {
                        file.delete();
                    }
                }
            }
        });
        
        
        this.add(b);
        this.setLayout(new FlowLayout());
    }
    
    public static void main(String[] args) throws IOException {
        CreateFileMain m = new CreateFileMain();
        m.setVisible(true);
        m.setSize(200, 200);
        m.setLocationRelativeTo(null);
        m.setDefaultCloseOperation(EXIT_ON_CLOSE);
    }
 }

Output : enter image description here

Upvotes: 0

user8939461
user8939461

Reputation: 21

textFileStream won't read pre-existing data. It will include only new files:

created in the dataDirectory by atomically moving or renaming them into the data directory.

https://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources

Upvotes: 2

Related Questions