how to stream a json using flink?
Note on working with JSON in Flink:
Use JSONDeserializationSchema
to deserialize the events, which will produce ObjectNode
s. You can map the ObjectNode
to YourObject
for convenience or continue working with the ObjectNode
.
Tutorial on working with ObjectNode
: http://www.baeldung.com/jackson-json-node-tree-model
Back to your case, you can do it like the following:
val eventStream : DataStream [ObjectNode] = oneMinuteAgg .addSource(source) .windowAll() .TimeWindow(Time.minutes(1)) .trigger(new MyTriggerFunc) .aggregation(new MyAggregationFunc)
will output a stream of 1min aggregates
[ { "date" :2018-03-03 "sum" : 120 }, { "date" :2018-03-03 "sum" : 120 }]
then chain another operator to the "oneMinuteAgg" that will add the 1min aggregates into 1day aggregates:
[...]oneMinuteAgg .windowAll() .TimeWindow(Time.days(1)) .trigger(new Whatever) .aggregation(new YourDayAggF)
that will output what you need
{ "aggregationType" : "day" "days before" : 4 "aggregates : [{ "date" :2018-03-03 "sum" : 120 }, { "date" :2018-03-03 "sum" : 120 }]}
I used windowAll()
assuming you don't need to key the stream.