Run Apache Flink with Amazon S3 Run Apache Flink with Amazon S3 hadoop hadoop

Run Apache Flink with Amazon S3


Update May 2016: The Flink documentation now has a page on how to use Flink with AWS


The question has been asked on the Flink user mailing list as well and I've answered it over there: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Processing-S3-data-with-Apache-Flink-td3046.html

tl;dr:

Flink program

public class S3FileSystem {   public static void main(String[] args) throws Exception {      ExecutionEnvironment ee = ExecutionEnvironment.createLocalEnvironment();      DataSet<String> myLines = ee.readTextFile("s3n://my-bucket-name/some-test-file.xml");      myLines.print();   }}

Add the following to core-site.xml and make it available to Flink:

<property>    <name>fs.s3n.awsAccessKeyId</name>    <value>putKeyHere</value></property><property>    <name>fs.s3n.awsSecretAccessKey</name>    <value>putSecretHere</value></property><property>    <name>fs.s3n.impl</name>    <value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value></property>


you can retrieve the artifacts from the S3 bucket that is specified in the output section of the CloudFormation template.i.e. After the Flink runtime is up and running, the taxi stream processor program can be submitted to the Flink runtime to start the real-time analysis of the trip events in the Amazon Kinesis stream.

$ aws s3 cp s3://«artifact-bucket»/artifacts/flink-taxi-stream-processor-1.0.jar .$ flink run -p 8 flink-taxi-stream-processor-1.0.jar --region «AWS region» --stream «Kinesis stream name» --es-endpoint https://«Elasticsearch endpoint»

Both of the above commands use Amazon's S3 as source, you have to specify the artifact name accordingly.

Note: you can follow the link below and make a pipeline using EMR and S3 buckets.

https://aws.amazon.com/blogs/big-data/build-a-real-time-stream-processing-pipeline-with-apache-flink-on-aws/