In Logstash, how do I limit the depth of JSON properties in my logs that are turned into Index fields in Elasticsearch? In Logstash, how do I limit the depth of JSON properties in my logs that are turned into Index fields in Elasticsearch? json json

In Logstash, how do I limit the depth of JSON properties in my logs that are turned into Index fields in Elasticsearch?


I never ultimately found a way to limit the depth of the automatic field creation. I also posted my question in the Elastic forums and never got an answer. Between the time of my post and now, I have learned a lot more about Logstash.

My ultimate solution was to extract the JSON properties that I needed as fields and then I used the GREEDYDATA pattern In a grok filter to place the rest of the properties into an unextractedJson field so that I could still query for values within that field in Elasticsearch.

Here is my new Filebeat configuration (minus the comments):

filebeat.inputs:- type: log  enabled: true  paths:  - d:/clients/company-here/rpms/logs/rpmsdev/*.json  #json.keys_under_root: true  json.add_error_key: truefilebeat.config.modules:  path: ${path.config}/modules.d/*.yml  reload.enabled: falsesetup.template.settings:  index.number_of_shards: 3setup.kibana:output.logstash:  hosts: ["localhost:5044"]

Note that I commented out the json.keys_under_root setting which tells Filebeat to place the JSON formatted log entry into a json field that is sent on to Logstash.

Here is a snippet of my new Logstash pipeline configuration:

#...filter {    ###########################################################################    # common date time extraction    date {        match => ["[json][time]", "ISO8601"]        remove_field => ["[json][time]"]    }    ###########################################################################    # configuration for the actions log    if [source] =~ /actionsCurrent.json/ {        if ("" in [json][eventProperties][logAction][performedByUserName]) {            mutate {                add_field => {                    "performedByUserName" => "%{[json][eventProperties][logAction][performedByUserName]}"                    "performedByFullName" => "%{[json][eventProperties][logAction][performedByFullName]}"                }                remove_field => [                    "[json][eventProperties][logAction][performedByUserName]",                     "[json][eventProperties][logAction][performedByFullName]"]            }        }        mutate {            add_field => {                "logFile" => "actions"                "logger" => "%{[json][logger]}"                "level" => "%{[json][level]}"                "performedAt" => "%{[json][eventProperties][logAction][performedAt]}"                "verb" => "%{[json][eventProperties][logAction][verb]}"                "url" => "%{[json][eventProperties][logAction][url]}"                "controller" => "%{[json][eventProperties][logAction][controller]}"                "action" => "%{[json][eventProperties][logAction][action]}"                "actionDescription" => "%{[json][eventProperties][logAction][actionDescription]}"                "statusCode" => "%{[json][eventProperties][logAction][statusCode]}"                "status" => "%{[json][eventProperties][logAction][status]}"            }            remove_field => [                "[json][logger]",                "[json][level]",                "[json][eventProperties][logAction][performedAt]",                "[json][eventProperties][logAction][verb]",                "[json][eventProperties][logAction][url]",                "[json][eventProperties][logAction][controller]",                "[json][eventProperties][logAction][action]",                "[json][eventProperties][logAction][actionDescription]",                "[json][eventProperties][logAction][statusCode]",                "[json][eventProperties][logAction][status]",                "[json][logAction]",                "[json][message]"            ]        }        mutate {            convert => {                "statusCode" => "integer"            }        }        grok {            match => { "json" => "%{GREEDYDATA:unextractedJson}" }            remove_field => ["json"]        }    }# ...

Note the add_field configuration options in the mutate commands that extract the properties into named fields followed by the remove_field configuration options that removes those properties from the JSON. At the end of the filter snippet, notice the grok command that gobbles up the rest of the JSON and places it in the unextractedJson field. Finally, and all importantly, I remove the json field that was provided by Filebeat. That last bit saves me from exposing all that JSON data to Elasticsearch/Kibana.

This solution takes log entries that look like this:

{ "time": "2018-09-13T13:36:45.376", "level": "DEBUG", "logger": "RPMS.WebAPI.Filters.LogActionAttribute", "message": "Log Action: RPMS.WebAPI.Entities.LogAction", "eventProperties": {"logAction": {"logActionId":26270372,"performedByUserId":"83fa1d72-fac2-4184-867e-8c2935a262e6","performedByUserName":"rpmsadmin@domain.net","performedByFullName":"Super Admin","clientIpAddress":"::1","controller":"Account","action":"Logout","actionDescription":"Logout.","url":"http://localhost:49399/api/Account/Logout","verb":"POST","statusCode":200,"status":"OK","request":null,"response":null,"performedAt":"2018-09-13T13:36:45.3707739-05:00"}}, "logAction": "RPMS.WebAPI.Entities.LogAction" }

And turns them into Elasticsearch indexes that look like this:

{  "_index": "actions-2018.09.13",  "_type": "doc",  "_id": "xvA41GUBIzzhuC5epTZG",  "_version": 1,  "_score": null,  "_source": {    "level": "DEBUG",    "tags": [      "beats_input_raw_event"    ],    "@timestamp": "2018-09-13T18:36:45.376Z",    "status": "OK",    "unextractedJson": "{\"eventProperties\"=>{\"logAction\"=>{\"performedByUserId\"=>\"83fa1d72-fac2-4184-867e-8c2935a262e6\", \"logActionId\"=>26270372, \"clientIpAddress\"=>\"::1\"}}}",    "action": "Logout",    "source": "d:\\path\\actionsCurrent.json",    "actionDescription": "Logout.",    "offset": 136120,    "@version": "1",    "verb": "POST",    "statusCode": 200,    "controller": "Account",    "performedByFullName": "Super Admin",    "logger": "RPMS.WebAPI.Filters.LogActionAttribute",    "input": {      "type": "log"    },    "url": "http://localhost:49399/api/Account/Logout",    "logFile": "actions",    "host": {      "name": "Development5"    },    "prospector": {      "type": "log"    },    "performedAt": "2018-09-13T13:36:45.3707739-05:00",    "beat": {      "name": "Development5",      "hostname": "Development5",      "version": "6.4.0"    },    "performedByUserName": "rpmsadmin@domain.net"  },  "fields": {    "@timestamp": [      "2018-09-13T18:36:45.376Z"    ],    "performedAt": [      "2018-09-13T18:36:45.370Z"    ]  },  "sort": [    1536863805376  ]}


The depth limit can be set per index directly in elastic search.

ElascticSearch Field Mapping documentation : https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html#mapping-limit-settings

From the docs :

index.mapping.depth.limitThe maximum depth for a field, which is measured as the number of inner objects. For instance, if all fields are defined at the root object level, then the depth is 1. If there is one object mapping, then the depth is 2, etc. Default is 20.

Related answer : Limiting the nested fields in Elasticsearch