Flink + Kafka + JSON - java example Flink + Kafka + JSON - java example json json

Flink + Kafka + JSON - java example


I followed Vishnu viswanath answer, however JSONKeyValueDeserializationSchema raises an exception during JSON parser step, even for a simple JSON as {"name":"John Doe"}.

The code that throws is:

DataStream<ObjectNode> messageStream = env.addSource(    new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")    , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {    private static final long serialVersionUID = -6867736771747690202L;    @Override    public String map(ObjectNode node) throws Exception {        return "Kafka and Flink says: " + node.get(0);    }}).print();

Output:

09/05/2016 11:16:02 Job execution switched to status FAILED.Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822)    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: java.lang.NullPointerException    at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:790)    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2215)    at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:52)    at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:38)    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)    at java.lang.Thread.run(Thread.java:745)

I was succeeded using another deserialization schema JSONDeserializationSchema

        DataStream<ObjectNode> messageStream = env.addSource(            new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")                    , new JSONDeserializationSchema(), parameterTool.getProperties()));    messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {        private static final long serialVersionUID = -6867736771747690202L;        @Override        public String map(ObjectNode value) throws Exception {            return "Kafka and Flink says: " + value.get("key").asText();        }    }).print();


Your error is at the line messageStream.map(new MapFunction<String, String>(). The mapFunction you defined expects an Input of the type String and output of the type String, but since you are using a JSONKeyValueDeserializationSchema which converts String to com.fasterxml.jackson.databind.node.ObjectNode your MapFunction should actually expect an input of the same type ObjectNode. Try the below code.

messageStream.map(new MapFunction<ObjectNode, String>() {        private static final long serialVersionUID = -6867736771747690202L;        @Override        public String map(ObjectNode node) throws Exception {            return "Kafka and Flink says: " + node.get(0);        }    });