py-r
py-r

Reputation: 451

PyFlink - JSON file sink?

Is it possible to use a JSON file sink in the Table API and/or DataStream API the same way as for CSV ?

Thanks !

Code

my_sink_ddl = f"""
    create table mySink (
        id STRING,
        dummy_item STRING
    ) with (
        'connector.type' = 'filesystem',
        'format.type' = 'json',
        'connector.path' = 'output.json'
    )
"""

Error

TableException: findAndCreateTableSink failed.

Upvotes: 0

Views: 969

Answers (1)

Mikalai Lushchytski
Mikalai Lushchytski

Reputation: 1641

Yes, according to the Jira FLINK-17286 Integrate json to file system connector and the corresponding pull request [FLINK-17286][connectors / filesystem]Integrate json to file system connector #12010, it is possible starting from Flink 1.11. Prior to Flink 1.11 I believe it was not supported.

You need to use following config:

... with (
        'connector' = 'filesystem',
        'format' = 'json',
        'path' = 'output_json' -- This must be a directory
    )

Plus following environment definition:

t_env = BatchTableEnvironment.create(   environment_settings=EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()) 

Upvotes: 4

Related Questions