Harshith Bolar
Harshith Bolar

Reputation: 826

Flink: How to pass a dynamic path while writing to files using writeFileAsText(path)?

Let's say I have a Stream with elements of type String. I want to write each element in the stream to a separate file in some folder. I'm using the following set up.

stream.writeAsText(path).setParallelism(1);

How do I make this path dynamic? I even tried adding System.nanotime() to the path to make it dynamic. But it still doesn't seem to work, everything gets written to a single file.

Upvotes: 2

Views: 1319

Answers (2)

David Anderson
David Anderson

Reputation: 43439

This sort of use case is explicitly supported in Flink by the Rolling File Sink with a custom bucketer, or the newer and prefered Streaming File Sink with a custom BucketAssigner and RollingPolicy.

Upvotes: 2

vividpk21
vividpk21

Reputation: 384

Your problem is that DataStream.writeAsText() writes the entire content of the stream to the file at once, so you will only ever get a single file.

It looks like this will return a collection that you can use to output your strings as different files.

dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
    throws Exception {
    for(String word: value.split(" ")){
        out.collect(word);
    }
}
});

Taken straight from the documentation here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html

Upvotes: 1

Related Questions