Logstash indexing JSON arrays
You can write your own filter (copy & paste, rename the class name, the config_name
and rewrite the filter(event)
method) or modify the current JSON filter (source on Github)
You can find the JSON filter (Ruby class) source code in the following path logstash-1.x.x\lib\logstash\filters
named as json.rb
. The JSON filter parse the content as JSON as follows
begin # TODO(sissel): Note, this will not successfully handle json lists # like your text is '[ 1,2,3 ]' JSON.parse gives you an array (correctly) # which won't merge into a hash. If someone needs this, we can fix it # later. dest.merge!(JSON.parse(source)) # If no target, we target the root of the event object. This can allow # you to overwrite @timestamp. If so, let's parse it as a timestamp! if !@target && event[TIMESTAMP].is_a?(String) # This is a hack to help folks who are mucking with @timestamp during # their json filter. You aren't supposed to do anything with # "@timestamp" outside of the date filter, but nobody listens... ;) event[TIMESTAMP] = Time.parse(event[TIMESTAMP]).utc end filter_matched(event)rescue => e event.tag("_jsonparsefailure") @logger.warn("Trouble parsing json", :source => @source, :raw => event[@source], :exception => e) returnend
You can modify the parsing procedure to modify the original JSON
json = JSON.parse(source) if json.is_a?(Hash) json.each do |key, value| if value.is_a?(Array) value.each_with_index do |object, index| #modify as you need object["index"]=index end end end end #save modified json ...... dest.merge!(json)
then you can modify your config file to use the/your new/modified JSON filter and place in \logstash-1.x.x\lib\logstash\config
This is mine elastic_with_json.conf
with a modified json.rb
filter
input{ stdin{ }}filter{ json{ source => "message" }}output{ elasticsearch{ host=>localhost }stdout{ }}
if you want to use your new filter you can configure it with the config_name
class LogStash::Filters::Json_index < LogStash::Filters::Base config_name "json_index" milestone 2 ....end
and configure it
input{ stdin{ }}filter{ json_index{ source => "message" }}output{ elasticsearch{ host=>localhost }stdout{ }}
Hope this helps.
For a quick and dirty hack, I used the Ruby
filter and below code , no need to use the out of box 'json' filter anymore
input { stdin{}}filter { grok { match => ["message","(?<json_raw>.*)"] } ruby { init => " def parse_json obj, pname=nil, event obj = JSON.parse(obj) unless obj.is_a? Hash obj = obj.to_hash unless obj.is_a? Hash obj.each {|k,v| p = pname.nil?? k : pname if v.is_a? Array v.each_with_index {|oo,ii| parse_json_array(oo,ii,p,event) } elsif v.is_a? Hash parse_json(v,p,event) else p = pname.nil?? k : [pname,k].join('.') event[p] = v end } end def parse_json_array obj, i,pname, event obj = JSON.parse(obj) unless obj.is_a? Hash pname_ = pname if obj.is_a? Hash obj.each {|k,v| p=[pname_,i,k].join('.') if v.is_a? Array v.each_with_index {|oo,ii| parse_json_array(oo,ii,p,event) } elsif v.is_a? Hash parse_json(v,p, event) else event[p] = v end } else n = [pname_, i].join('.') event[n] = obj end end " code => "parse_json(event['json_raw'].to_s,nil,event) if event['json_raw'].to_s.include? ':'" } }output { stdout{codec => rubydebug}}
Test json structure
{"id":123, "members":[{"i":1, "arr":[{"ii":11},{"ii":22}]},{"i":2}], "im_json":{"id":234, "members":[{"i":3},{"i":4}]}}
and this is whats output
{ "message" => "{\"id\":123, \"members\":[{\"i\":1, \"arr\":[{\"ii\":11},{\"ii\":22}]},{\"i\":2}], \"im_json\":{\"id\":234, \"members\":[{\"i\":3},{\"i\":4}]}}", "@version" => "1", "@timestamp" => "2014-07-25T00:06:00.814Z", "host" => "Leis-MacBook-Pro.local", "json_raw" => "{\"id\":123, \"members\":[{\"i\":1, \"arr\":[{\"ii\":11},{\"ii\":22}]},{\"i\":2}], \"im_json\":{\"id\":234, \"members\":[{\"i\":3},{\"i\":4}]}}", "id" => 123, "members.0.i" => 1,"members.0.arr.0.ii" => 11,"members.0.arr.1.ii" => 22, "members.1.i" => 2, "im_json" => 234, "im_json.0.i" => 3, "im_json.1.i" => 4 }
The solution I liked is the ruby filter because that requires us to not write another filter. However, that solution creates fields that are on the "root" of JSON and it's hard to keep track of how the original document looked.
I came up with something similar that's easier to follow and is a recursive solution so it's cleaner.
ruby { init => " def arrays_to_hash(h) h.each do |k,v| # If v is nil, an array is being iterated and the value is k. # If v is not nil, a hash is being iterated and the value is v. value = v || k if value.is_a?(Array) # "value" is replaced with "value_hash" later. value_hash = {} value.each_with_index do |v, i| value_hash[i.to_s] = v end h[k] = value_hash end if value.is_a?(Hash) || value.is_a?(Array) arrays_to_hash(value) end end end " code => "arrays_to_hash(event.to_hash)"}
It converts arrays to has with each key as the index number. More details:- http://blog.abhijeetr.com/2016/11/logstashelasticsearch-best-way-to.html