Beat input in Logstash is losing fields Beat input in Logstash is losing fields elasticsearch elasticsearch

Beat input in Logstash is losing fields


The answer provided by @baudsp was mostly correct, but it was incomplete. I had exactly the same problem, and I also had exactly the same filter mentioned in the documentation (and in @baudsp's answer), but documents in Elastic Search still did not contain any of the expected fields.

I finally found the problem: because I had Filebeat configured to send Nginx logs via the Nginx module and not the Log input, the data coming from Logbeat didn't match quite what the example Logstash filter was expecting.

The conditional in the example is if [fileset][module] == "nginx", which is correct if Filebeat was sending data from a Log input. However, since the log data is coming from the Nginx module, the fileset property doesn't contain a module property.

To make the filter work with Logstash data coming from the Nginx module, the conditional needs to be modified to look for something else. I found the [event][module] to work in place of [fileset][module].

The working filter:

filter {  if [event][module] == "nginx" {    if [fileset][name] == "access" {      grok {        match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }        remove_field => "message"      }      mutate {        add_field => { "read_timestamp" => "%{@timestamp}" }      }      date {        match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]        remove_field => "[nginx][access][time]"      }      useragent {        source => "[nginx][access][agent]"        target => "[nginx][access][user_agent]"        remove_field => "[nginx][access][agent]"      }      geoip {        source => "[nginx][access][remote_ip]"        target => "[nginx][access][geoip]"      }    }    else if [fileset][name] == "error" {      grok {        match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }        remove_field => "message"      }      mutate {        rename => { "@timestamp" => "read_timestamp" }      }      date {        match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]        remove_field => "[nginx][error][time]"      }    }  }}

Now, documents in Elastic Search have all of the expected fields:Screenshot of Nginx access log entry in Elastic Search

Note: You'll have the same problem with other Filebeat modules, too. Just use [event][module] in place of [fileset][module].


From your logstash configuration, it doesn't look like you are parsing the log message.

There's an example in the logstash documentation on how to parse nginx logs:

Nginx Logs

The Logstash pipeline configuration in this example shows how to ship and parse access and error logs collected by the nginx Filebeat module.

  input {    beats {      port => 5044      host => "0.0.0.0"    }  }  filter {    if [fileset][module] == "nginx" {      if [fileset][name] == "access" {        grok {          match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }          remove_field => "message"        }        mutate {          add_field => { "read_timestamp" => "%{@timestamp}" }        }        date {          match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]          remove_field => "[nginx][access][time]"        }        useragent {          source => "[nginx][access][agent]"          target => "[nginx][access][user_agent]"          remove_field => "[nginx][access][agent]"        }        geoip {          source => "[nginx][access][remote_ip]"          target => "[nginx][access][geoip]"        }      }      else if [fileset][name] == "error" {        grok {          match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }          remove_field => "message"        }        mutate {          rename => { "@timestamp" => "read_timestamp" }        }        date {          match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]          remove_field => "[nginx][error][time]"        }      }    }  }

I know it doesn't deal with why filebeat doesn't send to logstash the full object, but it should give a start on how to parse the nginx logs in logstash.