Flink using S3AFileSystem does not read subfolders from S3 Flink using S3AFileSystem does not read subfolders from S3 hadoop hadoop

Flink using S3AFileSystem does not read subfolders from S3


Answering my own question... with help from Steve Loughran above.

In Flink, when working with a file-based data source to process continuously, FileInputFormat does not enumerate nested files by default.

This is true whether the source is S3 or anything else.

You must set it like so:

def main(args: Array[String]) {  val parameters = ParameterTool.fromArgs(args)  val bucket = parameters.get("bucket")  val folder = parameters.get("folder")  val path = s"s3a://$bucket/$folder"  val env = StreamExecutionEnvironment.getExecutionEnvironment  val textInputFormat = new TextInputFormat(new Path(path))  //this is important!  textInputFormat.setNestedFileEnumeration(true)  val lines: DataStream[String] = env.readFile(    inputFormat = textInputFormat,    filePath = path,    watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,    interval = Time.seconds(10).toMilliseconds)  lines.print()  env.execute("Flink Streaming Scala API Skeleton")

}


What version of Hadoop is under this?

If this has stopped with Hadoop 2.8, is probably a regression, meaning probably my fault. First file a JIRA @ issues.apache.org under FLINK, then, if its new in 2.8.0 link it as broken-by HADOOP-13208

The code snippet here is a good example that could be used for a regression test, and it's time I did some for Flink.

That big listFiles() change moves the enum of files under a path from a recursive treewalk to a series of flat lists of all child entries under a path: it works fantastically for everything else (distcp, tests, hive, spark) and has been shipping in products since Dec '16; I'd be somewhat surprised if it is the cause, but not denying blame. Sorry


As with flink 1.7.x version Flink provides two file systems to talk to Amazon S3, flink-s3-fs-presto and flink-s3-fs-hadoop. Both flink-s3-fs-hadoop and flink-s3-fs-presto register default FileSystem wrappers for URIs with the s3:// scheme, flink-s3-fs-hadoop also registers for s3a:// and flink-s3-fs-presto also registers for s3p://, so you can use this to use both at the same time.

Sample code :

//Reading Data from S3// This will print all the contents in the bucket line wisefinal Path directory = new Path("s3a://husnain28may2020/");final FileSystem fs = directory.getFileSystem();//using input formatorg.apache.flink.api.java.io.TextInputFormat textInputFormatS3 = new org.apache.flink.api.java.io.TextInputFormat(directory);DataSet<String> linesS3 = env.createInput(textInputFormatS3);linesS3.print();