Reputation: 826
Let's say I have a Stream with elements of type String
. I want to write each element in the stream to a separate file in some folder. I'm using the following set up.
stream.writeAsText(path).setParallelism(1);
How do I make this path dynamic? I even tried adding System.nanotime()
to the path to make it dynamic. But it still doesn't seem to work, everything gets written to a single file.
Upvotes: 2
Views: 1319
Reputation: 43439
This sort of use case is explicitly supported in Flink by the Rolling File Sink with a custom bucketer, or the newer and prefered Streaming File Sink with a custom BucketAssigner and RollingPolicy.
Upvotes: 2
Reputation: 384
Your problem is that DataStream.writeAsText() writes the entire content of the stream to the file at once, so you will only ever get a single file.
It looks like this will return a collection that you can use to output your strings as different files.
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
Taken straight from the documentation here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html
Upvotes: 1