Reputation: 1610
I want to load a bunch of csv files into Apache Flink in a given order, e.g. as determined by a naming scheme in the file names, which could contain some time stamp information.
In Apache Spark I can stream files to a data set as soon as they are moved to a specific directory (e.g. /data/staging) with an atomic file move as follows
Dataset<Row> fileStreamDf = spark.readStream()
.option("header", true)
.schema(schema)
.csv("/data/staging")
I would then move the files one by one in the given order, e.g. with a bash script, to that staging directory.
How can I achieve the same thing with Apache Flink?
Upvotes: 0
Views: 1416
Reputation: 9245
It's not exactly the same use case, but we had to do something similar in a streaming job (the files are HDF5
not CSV
). So I wrote a RichSourceFunction
that knows how to iterate over the files in the proper order, and emits the file paths (these are in S3) as String records. Then a downstream FlatMapFunction
parses the file and emits the actual rows.
Upvotes: 1