how to stream a json using flink? how to stream a json using flink? json json

how to stream a json using flink?


Note on working with JSON in Flink:

Use JSONDeserializationSchema to deserialize the events, which will produce ObjectNodes. You can map the ObjectNode to YourObject for convenience or continue working with the ObjectNode.

Tutorial on working with ObjectNode: http://www.baeldung.com/jackson-json-node-tree-model

Back to your case, you can do it like the following:

val eventStream : DataStream [ObjectNode] = oneMinuteAgg    .addSource(source)    .windowAll()    .TimeWindow(Time.minutes(1))    .trigger(new MyTriggerFunc)    .aggregation(new MyAggregationFunc)

will output a stream of 1min aggregates

[           {      "date" :2018-03-03      "sum" : 120      },       {      "date" :2018-03-03      "sum" : 120      }]

then chain another operator to the "oneMinuteAgg" that will add the 1min aggregates into 1day aggregates:

[...]oneMinuteAgg        .windowAll()        .TimeWindow(Time.days(1))        .trigger(new Whatever)        .aggregation(new YourDayAggF)

that will output what you need

{    "aggregationType" : "day"    "days before" : 4    "aggregates : [{      "date" :2018-03-03      "sum" : 120      },       {      "date" :2018-03-03      "sum" : 120      }]}

I used windowAll() assuming you don't need to key the stream.