Reputation: 137
I am using apache flink via the Scala API and at some point I obtain a DataSet[(Int, Int, Int)]
. The result of using the methods writeAsCSV()
and writeAsText()
is unexpected. It creates a directory. That directory has as location and name the first parameter of the method call (e.g. filePath
.) In that directory two files emerge with names "1" and "2". In these files I can see the DataSets data. They seems that the DataSets content is partitioned into these two files.
Trying to recreate this behavior to show a more concise code fragment I could not. That is I witnessed the creation of one file with the expected name at the expected position and no creation of a directory.
val mas = ma_ groupBy(0,1) sum(2)
mas.writeAsCsv("c:\flink\mas.csv" )
results in the creation of a directory with the name "mas.csv" and two files in it "1" and "2". When does something like this happen? Used flink 9.1 local mode, Windows 7, scala 2.10, eclipse3.0.3
Upvotes: 2
Views: 1511
Reputation: 62330
This is expected behavior. If you want to get a single output file, you need to set the parallelism for the sink to one.
dataset = dataset.writeAsCsv("filename").setParallelism(1);
For DataStream API, you need to insert an additional rebalane()
to break the operator chain. Otherwise, the whole chain will be executed with dop=1 or setParallelism()
might be ignored.
datastream = datastream.rebalance().writeAsCsv("filename").setParallelism(1);
Upvotes: 6