Flink using S3AFileSystem does not read subfolders from S3
Answering my own question... with help from Steve Loughran above.
In Flink, when working with a file-based data source to process continuously, FileInputFormat
does not enumerate nested files by default.
This is true whether the source is S3 or anything else.
You must set it like so:
def main(args: Array[String]) { val parameters = ParameterTool.fromArgs(args) val bucket = parameters.get("bucket") val folder = parameters.get("folder") val path = s"s3a://$bucket/$folder" val env = StreamExecutionEnvironment.getExecutionEnvironment val textInputFormat = new TextInputFormat(new Path(path)) //this is important! textInputFormat.setNestedFileEnumeration(true) val lines: DataStream[String] = env.readFile( inputFormat = textInputFormat, filePath = path, watchType = FileProcessingMode.PROCESS_CONTINUOUSLY, interval = Time.seconds(10).toMilliseconds) lines.print() env.execute("Flink Streaming Scala API Skeleton")
}
What version of Hadoop is under this?
If this has stopped with Hadoop 2.8, is probably a regression, meaning probably my fault. First file a JIRA @ issues.apache.org under FLINK, then, if its new in 2.8.0 link it as broken-by HADOOP-13208
The code snippet here is a good example that could be used for a regression test, and it's time I did some for Flink.
That big listFiles()
change moves the enum of files under a path from a recursive treewalk to a series of flat lists of all child entries under a path: it works fantastically for everything else (distcp, tests, hive, spark) and has been shipping in products since Dec '16; I'd be somewhat surprised if it is the cause, but not denying blame. Sorry
As with flink 1.7.x version Flink provides two file systems to talk to Amazon S3, flink-s3-fs-presto
and flink-s3-fs-hadoop
. Both flink-s3-fs-hadoop
and flink-s3-fs-presto
register default FileSystem wrappers for URIs with the s3://
scheme, flink-s3-fs-hadoop
also registers for s3a://
and flink-s3-fs-presto
also registers for s3p://
, so you can use this to use both at the same time.
Sample code :
//Reading Data from S3// This will print all the contents in the bucket line wisefinal Path directory = new Path("s3a://husnain28may2020/");final FileSystem fs = directory.getFileSystem();//using input formatorg.apache.flink.api.java.io.TextInputFormat textInputFormatS3 = new org.apache.flink.api.java.io.TextInputFormat(directory);DataSet<String> linesS3 = env.createInput(textInputFormatS3);linesS3.print();