usert4jju7
usert4jju7

Reputation: 1813

Fluentd - Json within the log field is enclosed by double quotes

We have a formatting issue with logs forwarded through fluentd. The issue is that backslash is getting added preceding all double quotes.

Example -

2022-02-14T10:17:46+13:00     myapp            {"log":"{\"name\":\"contents\",\"hostname\":\"vcr-amyapp1-yyut-4uh57vb-rr73g\",\"pid\":876265,\"level\":20,\"req_id\":\"1644787066643:vcr-myapp1-03-e263f-v4.0.5:876265:kwljxg59:30317\",\"data\":{\"method\":\"get\",\"url\":\"/api/content/heartbeat\",\"agent\":\"Go-http-client/1.1\"},\"msg\":\"\",\"time\":\"2022-02-13T21:17:46.644Z\",\"v\":0}","container_name":"vcr-myapp1-03-e263f"}

The backslashes are invalidating everything within the log json field. That is, it treats the log field as a string instead of json. We need fields within log json field to also be valid json.

What should be changed within fluentd to not add backslashes? I've spent a week now digging into this & still not able to find anything helpful.

I would truly appreciate any help please.

Current fluentd config file below -

<system>
  workers 1
</system>

<source>
  @type  forward
  @id    input1
  @label @mainstream
  port  24224
</source>

# Used for docker health check: healthcheck http://localhost:5000/healthcheck?json=%7B%22log%22%3A+%22health+check%22%7D
# The query parameter in the URL defines a URL-encoded JSON object that looks like this:
# {"log": "health check"}
# The container health check inputs a log message of “health check”. While the query parameter in the URL defines the log message, the path, which is /healthcheck, sets the tag for the log message. In Fluentd, log messages are tagged, which allows them to be routed to different destinations.
<source>
  @type http
  port 5000
  bind 0.0.0.0
</source>

# records sent for health checking won't be forwarded anywhere
<match health*>
  @type null
</match>

<label @mainstream>
  <filter **>
    @type record_modifier
    remove_keys container_id,source
  </filter>
  <match **>
    @type copy
    <store>
      @type file
      @id   output1
      path         /fluentd/log/data.*.log
      symlink_path /fluentd/log/data.log
      append       true
      time_slice_format %Y%m%d
      time_slice_wait   10m
      time_format       %Y%m%dT%H%M%S%z
    </store>
    <store>
      @type elasticsearch
      host {{ env "efk__elasticsearch_host" }}
      port {{ env "efk__elasticsearch_port" }}
      logstash_format true
      logstash_prefix fluentd
      logstash_dateformat %Y%m%d
      include_tag_key true
      type_name access_log
      tag_key @log_name
      flush_interval 1s
    </store>
{{ if eq (env "efk__fluent_splunk_hec_enabled") "true" }}
    <store>
      @type splunk_hec
      protocol {{ env "efk__fluent_splunk_hec_protocol" }}
      hec_host {{ env "efk__fluent_splunk_hec_host" }}
      hec_port {{ env "efk__fluent_splunk_hec_port" }}
      {{ if env "efk__fluent_splunk_hec_token" | regexMatch "^secret/.*" -}}
      hec_token {{ with printf "%s" (env "efk__fluent_splunk_hec_token") | secret }}{{ .Data.value }}{{ end }}
      {{ else -}}
      hec_token {{ env "efk__fluent_splunk_hec_token" }}
      {{ end }}
      sourcetype ${tag}
    </store>
{{ end }}
  </match>
</label>

Upvotes: 0

Views: 1498

Answers (1)

usert4jju7
usert4jju7

Reputation: 1813

Posting the answer in case this helps someone

Had to add a few things to get this to work.

The filter section needed modification as follows

  <filter **>
    @type record_modifier
    remove_keys "container_id,source"
    @type parser
    key_name log
    hash_value_field log
    <parse>
      @type json
    </parse>
  </filter>

Complete config below -

<system>
  workers 1
</system>

<source>
  @type  forward
  @id    input1
  @label @mainstream
  port  24224
</source>

# Used for docker health check: healthcheck http://localhost:5000/healthcheck?json=%7B%22log%22%3A+%22health+check%22%7D
# The query parameter in the URL defines a URL-encoded JSON object that looks like this:
# {"log": "health check"}
# The container health check inputs a log message of “health check”. While the query parameter in the URL defines the log message, the path, which is /healthcheck, sets the tag for the log message. In Fluentd, log messages are tagged, which allows them to be routed to different destinations.
<source>
  @type http
  port 5000
  bind 0.0.0.0
</source>

# records sent for health checking won't be forwarded anywhere
<match health*>
  @type null
</match>

<label @mainstream>
  <filter **>
    @type record_modifier
    remove_keys "container_id,source"
    @type parser
    key_name log
    hash_value_field log
    <parse>
      @type json
    </parse>
  </filter>
  <match **>
    @type copy
    <store>
      @type file
      @id   output1
      path         /fluentd/log/data.*.log
      symlink_path /fluentd/log/data.log
      append       true
      time_slice_format %Y%m%d
      time_slice_wait   10m
      time_format       %Y%m%dT%H%M%S%z
    </store>
    <store>
      @type elasticsearch
      host {{ env "efk__elasticsearch_host" }}
      port {{ env "efk__elasticsearch_port" }}
      logstash_format true
      logstash_prefix fluentd
      logstash_dateformat %Y%m%d
      include_tag_key true
      type_name access_log
      tag_key @log_name
      flush_interval 1s
    </store>
{{ if eq (env "efk__fluent_splunk_hec_enabled") "true" }}
    <store>
      @type splunk_hec
      protocol {{ env "efk__fluent_splunk_hec_protocol" }}
      hec_host {{ env "efk__fluent_splunk_hec_host" }}
      hec_port {{ env "efk__fluent_splunk_hec_port" }}
      {{ if env "efk__fluent_splunk_hec_token" | regexMatch "^secret/.*" -}}
      hec_token {{ with printf "%s" (env "efk__fluent_splunk_hec_token") | secret }}{{ .Data.value }}{{ end }}
      {{ else -}}
      hec_token {{ env "efk__fluent_splunk_hec_token" }}
      {{ end }}
      sourcetype ${tag}
    </store>
{{ end }}
  </match>
</label>

Upvotes: 0

Related Questions