fluentd not parsing JSON log file entry
This config worked for me:
<source> @type tail path /var/log/containers/*.log,/var/log/containers/*.log pos_file /opt/bitnami/fluentd/logs/buffers/fluentd-docker.pos tag kubernetes.* read_from_head true <parse> @type json time_key time time_format %iso8601 </parse></source><filter kubernetes.**> @type parser key_name "$.log" hash_value_field "log" reserve_data true <parse> @type json </parse> </filter><filter kubernetes.**> @type kubernetes_metadata</filter>
Make sure to edit path so that it matches your use case.
This happens because docker logs in /var/log/containers/*.log
put container STDOUT under 'log' key as string, so to put those JSON logs there as strings they must be first serialized to strings. What you need to do is to add an additional step that will parse this string under 'log' key:
<filter kubernetes.**> @type parser key_name "$.log" hash_value_field "log" reserve_data true <parse> @type json </parse> </filter>
I had a json being emmited from my container like this:
{"asctime": "2020-06-28 23:40:37,184", "filename": "streaming_pull_manager.py", "funcName": "_should_recover", "lineno": 648, "processName": "MainProcess", "threadName": "Thread-6", "message": "Observed recoverable stream error 504 Deadline Exceeded", "severity": "INFO"}
And Kibana was showing "failed to find message". Then I went and google around and I fixed that by appending the following code to my kubernetes.conf:
<filter **> @type record_transformer <record> log_json ${record["log"]} </record></filter><filter **> @type parser @log_level debug key_name log_json reserve_data true remove_key_name_field true emit_invalid_record_to_error false <parse> @type json </parse></filter>
The final kuberenetes.json file looks like this:
<label @FLUENT_LOG> <match fluent.**> @type null </match></label><source> @type tail @id in_tail_container_logs path /var/log/containers/*.log pos_file /var/log/fluentd-containers.log.pos tag "#{ENV['FLUENT_CONTAINER_TAIL_TAG'] || 'kubernetes.*'}" exclude_path "#{ENV['FLUENT_CONTAINER_TAIL_EXCLUDE_PATH'] || use_default}" read_from_head true <parse> @type "#{ENV['FLUENT_CONTAINER_TAIL_PARSER_TYPE'] || 'json'}" time_format %Y-%m-%dT%H:%M:%S.%NZ </parse></source><source> @type tail @id in_tail_minion path /var/log/salt/minion pos_file /var/log/fluentd-salt.pos tag salt <parse> @type regexp expression /^(?<time>[^ ]* [^ ,]*)[^\[]*\[[^\]]*\]\[(?<severity>[^ \]]*) *\] (?<message>.*)$/ time_format %Y-%m-%d %H:%M:%S </parse></source><source> @type tail @id in_tail_startupscript path /var/log/startupscript.log pos_file /var/log/fluentd-startupscript.log.pos tag startupscript <parse> @type syslog </parse></source><source> @type tail @id in_tail_docker path /var/log/docker.log pos_file /var/log/fluentd-docker.log.pos tag docker <parse> @type regexp expression /^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<message>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=($<status_code>\d+))?/ </parse></source><source> @type tail @id in_tail_etcd path /var/log/etcd.log pos_file /var/log/fluentd-etcd.log.pos tag etcd <parse> @type none </parse></source><source> @type tail @id in_tail_kubelet multiline_flush_interval 5s path /var/log/kubelet.log pos_file /var/log/fluentd-kubelet.log.pos tag kubelet <parse> @type kubernetes </parse></source><source> @type tail @id in_tail_kube_proxy multiline_flush_interval 5s path /var/log/kube-proxy.log pos_file /var/log/fluentd-kube-proxy.log.pos tag kube-proxy <parse> @type kubernetes </parse></source><source> @type tail @id in_tail_kube_apiserver multiline_flush_interval 5s path /var/log/kube-apiserver.log pos_file /var/log/fluentd-kube-apiserver.log.pos tag kube-apiserver <parse> @type kubernetes </parse></source><source> @type tail @id in_tail_kube_controller_manager multiline_flush_interval 5s path /var/log/kube-controller-manager.log pos_file /var/log/fluentd-kube-controller-manager.log.pos tag kube-controller-manager <parse> @type kubernetes </parse></source><source> @type tail @id in_tail_kube_scheduler multiline_flush_interval 5s path /var/log/kube-scheduler.log pos_file /var/log/fluentd-kube-scheduler.log.pos tag kube-scheduler <parse> @type kubernetes </parse></source><source> @type tail @id in_tail_rescheduler multiline_flush_interval 5s path /var/log/rescheduler.log pos_file /var/log/fluentd-rescheduler.log.pos tag rescheduler <parse> @type kubernetes </parse></source><source> @type tail @id in_tail_glbc multiline_flush_interval 5s path /var/log/glbc.log pos_file /var/log/fluentd-glbc.log.pos tag glbc <parse> @type kubernetes </parse></source><source> @type tail @id in_tail_cluster_autoscaler multiline_flush_interval 5s path /var/log/cluster-autoscaler.log pos_file /var/log/fluentd-cluster-autoscaler.log.pos tag cluster-autoscaler <parse> @type kubernetes </parse></source># Example:# 2017-02-09T00:15:57.992775796Z AUDIT: id="90c73c7c-97d6-4b65-9461-f94606ff825f" ip="104.132.1.72" method="GET" user="kubecfg" as="<self>" asgroups="<lookup>" namespace="default" uri="/api/v1/namespaces/default/pods"# 2017-02-09T00:15:57.993528822Z AUDIT: id="90c73c7c-97d6-4b65-9461-f94606ff825f" response="200"<source> @type tail @id in_tail_kube_apiserver_audit multiline_flush_interval 5s path /var/log/kubernetes/kube-apiserver-audit.log pos_file /var/log/kube-apiserver-audit.log.pos tag kube-apiserver-audit <parse> @type multiline format_firstline /^\S+\s+AUDIT:/ # Fields must be explicitly captured by name to be parsed into the record. # Fields may not always be present, and order may change, so this just looks # for a list of key="\"quoted\" value" pairs separated by spaces. # Unknown fields are ignored. # Note: We can't separate query/response lines as format1/format2 because # they don't always come one after the other for a given query. format1 /^(?<time>\S+) AUDIT:(?: (?:id="(?<id>(?:[^"\\]|\\.)*)"|ip="(?<ip>(?:[^"\\]|\\.)*)"|method="(?<method>(?:[^"\\]|\\.)*)"|user="(?<user>(?:[^"\\]|\\.)*)"|groups="(?<groups>(?:[^"\\]|\\.)*)"|as="(?<as>(?:[^"\\]|\\.)*)"|asgroups="(?<asgroups>(?:[^"\\]|\\.)*)"|namespace="(?<namespace>(?:[^"\\]|\\.)*)"|uri="(?<uri>(?:[^"\\]|\\.)*)"|response="(?<response>(?:[^"\\]|\\.)*)"|\w+="(?:[^"\\]|\\.)*"))*/ time_format %Y-%m-%dT%T.%L%Z </parse></source><filter kubernetes.**> @type kubernetes_metadata @id filter_kube_metadata kubernetes_url "#{ENV['FLUENT_FILTER_KUBERNETES_URL'] || 'https://' + ENV.fetch('KUBERNETES_SERVICE_HOST') + ':' + ENV.fetch('KUBERNETES_SERVICE_PORT') + '/api'}" verify_ssl "#{ENV['KUBERNETES_VERIFY_SSL'] || true}" ca_file "#{ENV['KUBERNETES_CA_FILE']}" skip_labels "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_LABELS'] || 'false'}" skip_container_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_CONTAINER_METADATA'] || 'false'}" skip_master_url "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_MASTER_URL'] || 'false'}" skip_namespace_metadata "#{ENV['FLUENT_KUBERNETES_METADATA_SKIP_NAMESPACE_METADATA'] || 'false'}"</filter><filter **> @type record_transformer <record> log_json ${record["log"]} </record></filter><filter **> @type parser @log_level debug key_name log_json reserve_data true remove_key_name_field true emit_invalid_record_to_error false <parse> @type json </parse></filter>
EDIT: If anyone is looking for how to overwrite fluent .conf files, especially kubernetes.conf, there is an amazing tutorial here.
Im SOLVED from this parse
check in http first, make sure it was parse, and log your container
fluentd.conf
<source> @type http port 5170 bind 0.0.0.0</source><filter *> @type parser key_name "$.log" hash_value_field "log" reserve_data true <parse> @type json </parse> </filter><match **> @type stdout</match>
and check http in your terminal with curl
curl -i -X POST -d 'json={"source":"stderr","log":"{\"applicationName\":\"api-producer-go\",\"level\":\"info\",\"msg\":\"Development is Running\",\"time\":\"2020-09-04T14:32:29Z\"}","container_id":"f9975c6a7bc6dcc21dbdacca8ff98152cd04ae28b3bc36707eba5453f6ff9960","container_name":"/api-producer-golang"}' http://localhost:5170/test.cycle