Ahmad.S
Ahmad.S

Reputation: 809

Streaming a File From HDFS Address in Apache Flink

In my Flink code, I am streaming a file which is located on HDFS folder, I get the error " (No such file or directory)", however I am sure the file name and address is correct as I used the same in the batch methods and every thing worked smoothly. Does any one know what could be the problem? Here is my code:

DataStream<FebrlObject> myStream = 
env.addSource(new MyObjectGenerator("hdfs://../Data/Dataset1.csv"));

and its related class:

public class MyObjectGenerator implements SourceFunction<MyObject> {

    private String dataFilePath;
    private float servingSpeedFactor;
    private Integer rowNo ; 
    private transient BufferedReader reader;
    private transient InputStream inputStream;

    public MyObjectGenerator(String dataFilePath) {
        this(dataFilePath, 1.0f);
    }

    public MyObjectGenerator(String dataFilePath, float servingSpeedFactor) {
        this.dataFilePath = dataFilePath;
        this.servingSpeedFactor = servingSpeedFactor;
        rowNo = 0 ;
    }

    @Override
    public void run(SourceContext<MyObject> sourceContext) throws Exception {
        long servingStartTime = Calendar.getInstance().getTimeInMillis();
        inputStream = new DataInputStream(new FileInputStream(dataFilePath));
        reader = new BufferedReader(new InputStreamReader(inputStream));
        String line;
        long dataStartTime;
        rowNo++;
        if (reader.ready() && (line = reader.readLine()) != null ) {
            MyObject myObject = MyObject.fromString(line);
            if (febrlObject!= null )
            sourceContext.collect(myObject);
        } else {
            return;
        }
        while (reader.ready() && (line = reader.readLine()) != null) {
            MyObject myObject = MyObject.fromString(line);
            sourceContext.collect( febrlObject );
        }
        this.reader.close();
        this.reader = null;
        this.inputStream.close();
        this.inputStream = null;
    }

    @Override
    public void cancel() {
        try {
            if (this.reader != null) {
                this.reader.close();
            }
            if( this.inputStream != null) {
                this.inputStream.close();
            }
        } catch (IOException ioe) {
            //
        } finally {
            this.reader = null;
            this.inputStream = null;
        }
    }
}

Upvotes: 2

Views: 1558

Answers (1)

Fabian Hueske
Fabian Hueske

Reputation: 18987

You try to access a file in HDFS with Java's regular FileInputStream. FileInputStream can only access the local file system. It does not know anything about talking to HDFS. You need to use the HDFS client to read files from HDFS. See Flink'sFileInputFormat` as an example.

However, I would try to avoid implementing this yourself if possible. You could try to use Flink's FileInputFormat to read the file line wise (returns a DataStream<String>) and a consecutive (flat) mapper that parses the line.

Upvotes: 5

Related Questions