Reputation: 809
In my Flink code, I am streaming a file which is located on HDFS folder, I get the error " (No such file or directory)", however I am sure the file name and address is correct as I used the same in the batch methods and every thing worked smoothly. Does any one know what could be the problem? Here is my code:
DataStream<FebrlObject> myStream =
env.addSource(new MyObjectGenerator("hdfs://../Data/Dataset1.csv"));
and its related class:
public class MyObjectGenerator implements SourceFunction<MyObject> {
private String dataFilePath;
private float servingSpeedFactor;
private Integer rowNo ;
private transient BufferedReader reader;
private transient InputStream inputStream;
public MyObjectGenerator(String dataFilePath) {
this(dataFilePath, 1.0f);
}
public MyObjectGenerator(String dataFilePath, float servingSpeedFactor) {
this.dataFilePath = dataFilePath;
this.servingSpeedFactor = servingSpeedFactor;
rowNo = 0 ;
}
@Override
public void run(SourceContext<MyObject> sourceContext) throws Exception {
long servingStartTime = Calendar.getInstance().getTimeInMillis();
inputStream = new DataInputStream(new FileInputStream(dataFilePath));
reader = new BufferedReader(new InputStreamReader(inputStream));
String line;
long dataStartTime;
rowNo++;
if (reader.ready() && (line = reader.readLine()) != null ) {
MyObject myObject = MyObject.fromString(line);
if (febrlObject!= null )
sourceContext.collect(myObject);
} else {
return;
}
while (reader.ready() && (line = reader.readLine()) != null) {
MyObject myObject = MyObject.fromString(line);
sourceContext.collect( febrlObject );
}
this.reader.close();
this.reader = null;
this.inputStream.close();
this.inputStream = null;
}
@Override
public void cancel() {
try {
if (this.reader != null) {
this.reader.close();
}
if( this.inputStream != null) {
this.inputStream.close();
}
} catch (IOException ioe) {
//
} finally {
this.reader = null;
this.inputStream = null;
}
}
}
Upvotes: 2
Views: 1558
Reputation: 18987
You try to access a file in HDFS with Java's regular FileInputStream
. FileInputStream can only access the local file system. It does not know anything about talking to HDFS. You need to use the HDFS client to read files from HDFS. See Flink's
FileInputFormat` as an example.
However, I would try to avoid implementing this yourself if possible. You could try to use Flink's FileInputFormat
to read the file line wise (returns a DataStream<String>
) and a consecutive (flat) mapper that parses the line.
Upvotes: 5