Dodge_X
Dodge_X

Reputation: 93

How to sink data into a single json file using flink v1.13+?

I want to sink my filtered data into a single json file,but got several parts in an output directory. Here is my code:

         String sinkSql =
                "create TEMPORARY table FileSink(" +
                        "name STRING," +
                        "timestamps BIGINT," +
                        "temp DOUBLE" +
                        ") WITH(" +
                        "'connector'='filesystem'," +
                        "'path'='E:\\output'," +
                        "'format' ='json')";
        tableEnvironment.executeSql(sinkSql);

        filter.executeInsert("FileSink");

Upvotes: 0

Views: 888

Answers (1)

David Anderson
David Anderson

Reputation: 43612

I don't believe there's a simple way to do this.

But what you could do is

  • run the job with a parallelism of 1
  • convert the result table into a datastream
  • convert that stream of rows into a stream of json strings (which might be more easily done by converting rows to POJOs to json)
  • use stream.writeAsText("/path/to/file") to produce the output

Flink 1.15 will add more JSON support in the table API. Perhaps with that release it will become possible to do this using the PrintSink -- not sure.

Upvotes: 1

Related Questions