Reputation: 133
We are using Flink 1.2.0 with the suggested S3AFileSystem configuration. A simple streaming job works as expected when its source is a single folder within an S3 bucket.
The job runs without errors--but does not produce output--when its source is a folder which itself contains subfolders.
For clarity, below is a model of the S3 bucket. Running the job to point to s3a://bucket/folder/2017/04/25/01/
properly reads all three objects and any subsequent objects that appear in the bucket. Pointing the job to s3a://bucket/folder/2017/
(or any other intermediate folder) leads to a job that runs without producing anything.
In fits of desperation, we've tried the permutations that [in|ex]clude the trailing /
.
.
`-- folder
`-- 2017
`-- 04
|-- 25
| |-- 00
| | |-- a.txt
| | `-- b.txt
| `-- 01
| |-- c.txt
| |-- d.txt
| `-- e.txt
`-- 26
Job code:
def main(args: Array[String]) {
val parameters = ParameterTool.fromArgs(args)
val bucket = parameters.get("bucket")
val folder = parameters.get("folder")
val path = s"s3a://$bucket/$folder"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines: DataStream[String] = env.readFile(
inputFormat = new TextInputFormat(new Path(path)),
filePath = path,
watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
interval = Time.seconds(10).toMilliseconds)
lines.print()
env.execute("Flink Streaming Scala API Skeleton")
}
core-site.xml is configured per the docs:
<configuration>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
</configuration>
We have included all the jars for S3AFileSystem listed here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#flink-for-hadoop-27
We are stumped. This seems like it should work; there are plenty of breadcrumbs on the internet that indicate that this did work. [e.g., http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-files-from-an-S3-folder-td10281.html]
Help me, fellow squirrels... you're my only hope!
Upvotes: 3
Views: 3302
Reputation: 11
As with flink 1.7.x version Flink provides two file systems to talk to Amazon S3, flink-s3-fs-presto
and flink-s3-fs-hadoop
. Both flink-s3-fs-hadoop
and flink-s3-fs-presto
register default FileSystem wrappers for URIs with the s3://
scheme, flink-s3-fs-hadoop
also registers for s3a://
and flink-s3-fs-presto
also registers for s3p://
, so you can use this to use both at the same time.
Sample code :
//Reading Data from S3
// This will print all the contents in the bucket line wise
final Path directory = new Path("s3a://husnain28may2020/");
final FileSystem fs = directory.getFileSystem();
//using input format
org.apache.flink.api.java.io.TextInputFormat textInputFormatS3 = new org.apache.flink.api.java.io.TextInputFormat(directory);
DataSet<String> linesS3 = env.createInput(textInputFormatS3);
linesS3.print();
Upvotes: 0
Reputation: 133
Answering my own question... with help from Steve Loughran above.
In Flink, when working with a file-based data source to process continuously, FileInputFormat
does not enumerate nested files by default.
This is true whether the source is S3 or anything else.
You must set it like so:
def main(args: Array[String]) {
val parameters = ParameterTool.fromArgs(args)
val bucket = parameters.get("bucket")
val folder = parameters.get("folder")
val path = s"s3a://$bucket/$folder"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val textInputFormat = new TextInputFormat(new Path(path))
//this is important!
textInputFormat.setNestedFileEnumeration(true)
val lines: DataStream[String] = env.readFile(
inputFormat = textInputFormat,
filePath = path,
watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
interval = Time.seconds(10).toMilliseconds)
lines.print()
env.execute("Flink Streaming Scala API Skeleton")
}
Upvotes: 9
Reputation: 13430
What version of Hadoop is under this?
If this has stopped with Hadoop 2.8, is probably a regression, meaning probably my fault. First file a JIRA @ issues.apache.org under FLINK, then, if its new in 2.8.0 link it as broken-by HADOOP-13208
The code snippet here is a good example that could be used for a regression test, and it's time I did some for Flink.
That big listFiles()
change moves the enum of files under a path from a recursive treewalk to a series of flat lists of all child entries under a path: it works fantastically for everything else (distcp, tests, hive, spark) and has been shipping in products since Dec '16; I'd be somewhat surprised if it is the cause, but not denying blame. Sorry
Upvotes: 0