houcros
houcros

Reputation: 1020

In Flink, how to write DataStream to single file?

The writeAsText or writeAsCsv methods of a DataStream write as many files as worker threads. As far as I could see, the methods only let you specify the path to these files and some formatting.

For debugging and testing purposes, it would be really useful to be able to print everything to a single file, without having to change the set up to having a single worker thread.

Is there any non-overly-complicated way to achieve this? I suspect it should be possible implementing a custom SinkFunction, but not sure about that one (besides, it also feels like a hassle for something that seems relatively simple).

Upvotes: 8

Views: 7405

Answers (2)

eseuteo
eseuteo

Reputation: 53

In Flink 1.13 this is not done with writeAsText function anymore, as it's deprecated.

As can be seen here now StreamingFileSink class and addSink operation should be used. Regarding setting the parallelism to 1, this is also done differently (by setting the StreamExecutionEnvironment parallelism to 1, with setParallelism method)

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val sink: StreamingFileSink[String] = StreamingFileSink
  .forRowFormat(new Path(outPath), new SimpleStringEncoder[String]("UTF-8"))
  .build()

dataStream.map(_.toString).addSink(sink)

Upvotes: 4

Robert Metzger
Robert Metzger

Reputation: 4542

You can achieve this by setting the parallelism to 1. This way, the writing happens only on one machine.

writeAsText(path).setParallelism(1);

Upvotes: 14

Related Questions