Reputation: 1
I am new to apache flink and trying to learn data streams. I am reading student data which has 3 columns (name, subject and marks) from a CSV file. I apply a filter on marks and only select those records where marks > 40
.
I am trying to write this data to a CSV file but the program runs successfully and the CSV file remains empty. No data gets written to the CSV file.
I tried different syntaxes for writing a CSV file, but none of them worked for me. I am running this locally through Eclipse. Writing to a text file works fine.
DataStream<String> text = env.readFile(
format,
params.get("input"),
FileProcessingMode.PROCESS_CONTINUOUSLY,
100
);
DataStream<String> filtered = text.filter(
new FilterFunction<String>() {
public boolean filter(String value) {
String[] tokens = value.split(",");
return Integer.parseInt(tokens[2]) >= 40;
}
}
);
filtered.writeAsText("testFilter",WriteMode.OVERWRITE);
DataStream<Tuple2<String, Integer>> tokenized = filtered
.map(new MapFunction<String, Tuple2<String, Integer>>() {
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2("Test", Integer.valueOf(1));
}
}
);
tokenized.print();
tokenized.writeAsCsv(
"file:///home/Test/Desktop/output.csv",
WriteMode.OVERWRITE,
"/n",
","
);
try {
env.execute();
} catch (Exception e1) {
e1.printStackTrace();
}
Below is my input CSV format:
Name1,Subj1,30
Name1,Subj2,40
Name1,Subj3,40
Name1,Subj4,40
tokenized.print()
prints all records correctly.
Upvotes: 0
Views: 3393
Reputation: 205
You should remove tokenized.print();
before tokenized.writeAsCsv();
.
It will consume the data the print();
.
Upvotes: 0
Reputation: 43499
I did a little experimenting, and found that this job works just fine:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class WriteCSV {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.fromElements(new Tuple2<>("abc", 1), new Tuple2<>("def", 2))
.writeAsCsv("file:///tmp/test.csv", FileSystem.WriteMode.OVERWRITE, "\n", ",");
env.execute();
}
}
If I don't set the parallelism to 1, then the results are different. In that case, test.csv is a directory containing four files, each written by one of the four parallel subtasks.
I'm not sure what's wrong in your case, but maybe you can work backwards from this example (assuming it works for you).
Upvotes: 1