Daniel
Daniel

Reputation: 1610

Apache Flink Streaming files from directory

I want to load a bunch of csv files into Apache Flink in a given order, e.g. as determined by a naming scheme in the file names, which could contain some time stamp information.

In Apache Spark I can stream files to a data set as soon as they are moved to a specific directory (e.g. /data/staging) with an atomic file move as follows

Dataset<Row> fileStreamDf = spark.readStream()
            .option("header", true)
            .schema(schema)
            .csv("/data/staging")

I would then move the files one by one in the given order, e.g. with a bash script, to that staging directory.

How can I achieve the same thing with Apache Flink?

Upvotes: 0

Views: 1416

Answers (1)

kkrugler
kkrugler

Reputation: 9245

It's not exactly the same use case, but we had to do something similar in a streaming job (the files are HDF5 not CSV). So I wrote a RichSourceFunction that knows how to iterate over the files in the proper order, and emits the file paths (these are in S3) as String records. Then a downstream FlatMapFunction parses the file and emits the actual rows.

Upvotes: 1

Related Questions