Flink + Kafka + JSON - java example
I followed Vishnu viswanath answer, however JSONKeyValueDeserializationSchema raises an exception during JSON parser step, even for a simple JSON as {"name":"John Doe"}
.
The code that throws is:
DataStream<ObjectNode> messageStream = env.addSource( new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic") , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));messageStream.rebalance().map(new MapFunction<ObjectNode, String>() { private static final long serialVersionUID = -6867736771747690202L; @Override public String map(ObjectNode node) throws Exception { return "Kafka and Flink says: " + node.get(0); }}).print();
Output:
09/05/2016 11:16:02 Job execution switched to status FAILED.Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: java.lang.NullPointerException at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:790) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2215) at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:52) at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:38) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227) at java.lang.Thread.run(Thread.java:745)
I was succeeded using another deserialization schema JSONDeserializationSchema
DataStream<ObjectNode> messageStream = env.addSource( new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic") , new JSONDeserializationSchema(), parameterTool.getProperties())); messageStream.rebalance().map(new MapFunction<ObjectNode, String>() { private static final long serialVersionUID = -6867736771747690202L; @Override public String map(ObjectNode value) throws Exception { return "Kafka and Flink says: " + value.get("key").asText(); } }).print();
Your error is at the line messageStream.map(new MapFunction<String, String>()
. The mapFunction you defined expects an Input of the type String and output of the type String, but since you are using a JSONKeyValueDeserializationSchema
which converts String to com.fasterxml.jackson.databind.node.ObjectNode
your MapFunction should actually expect an input of the same type ObjectNode. Try the below code.
messageStream.map(new MapFunction<ObjectNode, String>() { private static final long serialVersionUID = -6867736771747690202L; @Override public String map(ObjectNode node) throws Exception { return "Kafka and Flink says: " + node.get(0); } });