Reputation: 1940
I am trying to sink a stream into filesystem in csv format using PyFlink, however it does not work.
# stream_to_csv.py
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.execute_sql("""
CREATE TABLE datagen (
id INT,
data STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
)
""")
table_env.execute_sql("""
CREATE TABLE print (
id INT,
data STRING
) WITH (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/output'
)
""")
table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()
To run the script:
$ python stream_to_csv.py
I expect records go to /tmp/output folder, however that doesn't happen.
$ ~ ls /tmp/output
(nothing shown here)
Anything I miss?
Upvotes: 2
Views: 1062
Reputation: 1940
I shamelessly copy Dian Fu's reply in http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Not-able-to-sink-a-stream-into-csv-td43105.html.
You need to set the rolling policy for filesystem. You could refer to the Rolling Policy section [1] for more details.
Actually there are output and you could execute command ls -la /tmp/output/
, then you will see several files named “.part-xxx”.
For your job, you need to set the execution.checkpointing.interval
in the configuration and sink.rolling-policy.rollover-interval
in the property of Filesystem connector.
The job will look like the following:
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(environment_settings=env_settings)
table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")
table_env.execute_sql("""
CREATE TABLE datagen (
id INT,
data STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
)
""")
table_env.execute_sql("""
CREATE TABLE print (
id INT,
data STRING
) WITH (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '/tmp/output',
'sink.rolling-policy.rollover-interval' = '10s'
)
""")
table_env.execute_sql("""
INSERT INTO print
SELECT id, data
FROM datagen
""").wait()
Upvotes: 1