Patrick
Patrick

Reputation: 1066

Flink: Write tuples with CSV header into file

I did some data processing using Flink (1.7.1 with Hadoop). At the end I'd like to write the dataset consisting of 2-tuples into a file. Currently, I am doing it like this:

<Tuple2<Integer, Point>> pointsClustered = points.getClusteredPoints(...);
pointsClustered.writeAsCsv(params.get("output"), "\n", ",");

However, I would like to have the CSV headers written into the first line. The Flink's Javadoc API doesn't state any options for this. Furthermore, I couldn't find any solution googling for it.

Could you kindly advise on how to accomplish that. Thanks a lot!

Upvotes: 0

Views: 1407

Answers (2)

karthiks3000
karthiks3000

Reputation: 912

I was able to get around the limitation by simply adding a header row to the dataset using a union. This way the first row will always be the header row of the export.

DataSet<Tuple8<String, String, String, String, String, String, String, String>> headers = env.fromElements(
            Tuple8.of(
                 "SDMId", "ActivityType", "ActionType", "ActivityId", "ActivityLevel", "Timestamp", "SessionId", "Value"
            ));

DataSet<Tuple8<String, String, String, String, String, String, String, String>> results =
            headers.union(skillResults);

results.writeAsCsv("file:///Users/karthsub/Desktop/first_export.csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

Upvotes: 0

Till Rohrmann
Till Rohrmann

Reputation: 13356

Flink's own CsvOutputFormat does not support this functionality. What you could do is to extend the CsvOutputFormat and override the open method which writes the header when the format is opened. Then you would use DataSet#output to specify the newly created output format:

public static void main(String[] args) throws Exception {
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    DataSource<Integer> input = env.fromElements(1, 2, 3);
    DataSet<Tuple3<Integer, String, Double>> result = input.map((MapFunction<Integer, Tuple3<Integer, String, Double>>) integer -> Tuple3.of(integer, integer.toString(), 42.0));

    Path outputPath = new Path("hdfs:///foobar");
    result.output(new MyCsvOutputFormat(outputPath));

    env.execute();
}

private static class MyCsvOutputFormat<T extends Tuple> extends CsvOutputFormat<T> {

    public MyCsvOutputFormat(Path outputPath) {
        super(outputPath);
    }

    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        try (PrintWriter wrt = new PrintWriter(stream)) {
            wrt.println("Foo|bar|foobar");
        }
        super.open(taskNumber, numTasks);
    }
}

Upvotes: 2

Related Questions