Vectorで遊ぶ

id:nekop です。この記事はOpenShift Advent Calendar 2023のエントリです。

OpenShift Loggingでは長くEFK (Elasticsearch, Fluentd, Kibana)スタックが利用されていましたが、現在のリリースはLoki Vectorスタックを標準的に利用するようになっています。Elasticsearchは2022-07に非推奨、Fluentdは2023-01に非推奨となりました。

OpenShift LoggingではVectorによるログ収集と転送がサポートされています。VectorはDatadog社により開発されたデータ転送処理ソフトウェアです。Sourceと呼ばれる入力、Transformerによる加工、Sinkと呼ばれる出力の3つのコンポーネントを利用してデータの転送処理を行います。

今回はこのVectorにフォーカスして遊んでみます。通常構成のログストアであるLokiは利用せず、クラスタ内に受信側となる別のVector podを用意してそちらにログ転送してファイル出力を行う設定で構成します。

受信側Vectorはhttp_server sourceで8080ポート待ち受け、file sinkでファイル出力する設定です。/tmp配下のファイルとして保存します。

TARGET_PROJECT=tkimura-infra

cat <<EOF > vector.toml
expire_metrics_secs = 60

[api]
enabled = true

[sources.http_server_source]
type = "http_server"
address = "0.0.0.0:8080"
decoding.codec = "json"

[sinks.file_sink]
type = "file"
inputs = [ "http_server_source" ]
path = "/tmp/vector-%Y-%m-%d.log"
encoding.codec = "json"
EOF
oc -n $TARGET_PROJECT create configmap vector-http --from-file=vector.toml
oc -n $TARGET_PROJECT create deployment vector-http --image=quay.io/openshift-logging/vector:5.8 --port=8080
oc -n $TARGET_PROJECT set volume deploy/vector-http --add --name=vector-http-config --configmap-name=vector-http --mount-path=/etc/vector
oc -n $TARGET_PROJECT expose deploy/vector-http

oc -n openshift-logging create -f - <<EOF
apiVersion: logging.openshift.io/v1
kind: ClusterLogging
metadata:
  name: instance
  namespace: openshift-logging
spec:
  collection:
    type: vector
---
apiVersion: logging.openshift.io/v1
kind: ClusterLogForwarder
metadata:
  name: instance 
  namespace: openshift-logging 
spec:
  outputs:
   - name: ext-vector-http
     type: http
     url: 'http://vector-http.$TARGET_PROJECT.svc:8080'
  pipelines:
   - name: forward-to-ext-vector-http
     inputRefs:
     - application
     - infrastructure
     outputRefs:
     - ext-vector-http
EOF

デプロイすると、受信側Vectorで転送されたログを確認することができます。

$ oc -n tkimura-infra rsh deploy/vector-http ls -la /tmp
total 131968
drwxrwxrwt. 1 root       root        35 Dec 22 07:44 .
dr-xr-xr-x. 1 root       root        39 Dec 22 07:44 ..
-rw-r--r--. 1 1000710000 root 101867631 Dec 22 08:22 vector-2023-12-22.log
$ oc -n tkimura-infra rsh deploy/vector-http tail -n2 /tmp/vector-2023-12-22.log
{"@timestamp":"2023-12-22T07:57:53.432429470Z","hostname":"ip-10-0-45-201.ap-northeast-1.compute.internal","kubernetes":{"annotations":{"openshift.io/scc":"hostaccess"},"container_id":"cri-o://465432752c76d4aff0419119d2365fbbf38cee44620ff746c974380df12cbeef","container_image":"quay.io/openshift-release-dev/ocp-release@sha256:e5128c3b0ab225e0abf9344dae504e08b82dda4885bbd047e2dbc13cc3d9879b","container_name":"cluster-version-operator","labels":{"k8s-app":"cluster-version-operator","pod-template-hash":"678d8bb44"},"namespace_id":"e0c35f25-5a1f-4682-bfa2-d69dab7418fc","namespace_labels":{"kubernetes_io_metadata_name":"openshift-cluster-version","name":"openshift-cluster-version","olm_operatorgroup_uid_32e3efbd-dac1-4364-bab3-cd37df669a1e":"","openshift_io_cluster-monitoring":"true","openshift_io_run-level":"","pod-security_kubernetes_io_audit":"privileged","pod-security_kubernetes_io_enforce":"privileged","pod-security_kubernetes_io_warn":"privileged"},"namespace_name":"openshift-cluster-version","pod_id":"4b2b3ab9-297a-4480-8857-d80244289da1","pod_ip":"10.0.45.201","pod_name":"cluster-version-operator-678d8bb44-s8c7x","pod_owner":"ReplicaSet/cluster-version-operator-678d8bb44"},"level":"info","log_type":"infrastructure","message":"I1222 07:57:53.432384       1 sync_worker.go:990] Done syncing for cloudcredential \"cluster\" (269 of 860)","openshift":{"cluster_id":"1e0f1eb9-9174-426b-8e2e-6b401fca7fbe","sequence":29292},"path":"/","source_type":"http_server","timestamp":"2023-12-22T07:57:53.454795108Z"}
{"@timestamp":"2023-12-22T07:57:53.432429470Z","hostname":"ip-10-0-45-201.ap-northeast-1.compute.internal","kubernetes":{"annotations":{"openshift.io/scc":"hostaccess"},"container_id":"cri-o://465432752c76d4aff0419119d2365fbbf38cee44620ff746c974380df12cbeef","container_image":"quay.io/openshift-release-dev/ocp-release@sha256:e5128c3b0ab225e0abf9344dae504e08b82dda4885bbd047e2dbc13cc3d9879b","container_name":"cluster-version-operator","labels":{"k8s-app":"cluster-version-operator","pod-template-hash":"678d8bb44"},"namespace_id":"e0c35f25-5a1f-4682-bfa2-d69dab7418fc","namespace_labels":{"kubernetes_io_metadata_name":"openshift-cluster-version","name":"openshift-cluster-version","olm_operatorgroup_uid_32e3efbd-dac1-4364-bab3-cd37df669a1e":"","openshift_io_cluster-monitoring":"true","openshift_io_run-level":"","pod-security_kubernetes_io_audit":"privileged","pod-security_kubernetes_io_enforce":"privileged","pod-security_kubernetes_io_warn":"privileged"},"namespace_name":"openshift-cluster-version","pod_id":"4b2b3ab9-297a-4480-8857-d80244289da1","pod_ip":"10.0.45.201","pod_name":"cluster-version-operator-678d8bb44-s8c7x","pod_owner":"ReplicaSet/cluster-version-operator-678d8bb44"},"level":"info","log_type":"infrastructure","message":"I1222 07:57:53.432416       1 sync_worker.go:975] Running sync for rolebinding \"openshift-cloud-credential-operator/cloud-credential-operator\" (270 of 860)","openshift":{"cluster_id":"1e0f1eb9-9174-426b-8e2e-6b401fca7fbe","sequence":29293},"path":"/","source_type":"http_server","timestamp":"2023-12-22T07:57:53.454795108Z"}

設定ファイルを見てみましょう。vector.tomlは大まかに以下の設定になっています。

  • kubernetes_logs source
  • journald source
  • 基本的な加工
  • openshift, kube namespaceのログとjournaldをinfrastructureに分類、その他をapplicationに分類
  • 受信側vectorへのhttp sink
$ oc -n openshift-logging extract secret/collector-config
vector.toml
run-vector.sh
$ cat vector.toml 
expire_metrics_secs = 60


[api]
enabled = true

# Logs from containers (including openshift containers)
[sources.raw_container_logs]
type = "kubernetes_logs"
max_read_bytes = 3145728
glob_minimum_cooldown_ms = 15000
auto_partial_merge = true
exclude_paths_glob_patterns = ["/var/log/pods/openshift-logging_logfilesmetricexporter-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_*/loki*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log", "/var/log/pods/openshift-logging_*/gateway/*.log", "/var/log/pods/openshift-logging_*/opa/*.log", "/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.tmp"]
pod_annotation_fields.pod_labels = "kubernetes.labels"
pod_annotation_fields.pod_namespace = "kubernetes.namespace_name"
pod_annotation_fields.pod_annotations = "kubernetes.annotations"
pod_annotation_fields.pod_uid = "kubernetes.pod_id"
pod_annotation_fields.pod_node_name = "hostname"
namespace_annotation_fields.namespace_uid = "kubernetes.namespace_id"
rotate_wait_ms = 5000

[sources.raw_journal_logs]
type = "journald"
journal_directory = "/var/log/journal"

[sources.internal_metrics]
type = "internal_metrics"

[transforms.container_logs]
type = "remap"
inputs = ["raw_container_logs"]
source = '''
  .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}"
  if !exists(.level) {
    .level = "default"
    if match!(.message, r'Warning|WARN|^W[0-9]+|level=warn|Value:warn|"level":"warn"|<warn>') {
      .level = "warn"
    } else if match!(.message, r'Error|ERROR|^E[0-9]+|level=error|Value:error|"level":"error"|<error>') {
      .level = "error"
    } else if match!(.message, r'Critical|CRITICAL|^C[0-9]+|level=critical|Value:critical|"level":"critical"|<critical>') {
      .level = "critical"
    } else if match!(.message, r'Debug|DEBUG|^D[0-9]+|level=debug|Value:debug|"level":"debug"|<debug>') {
      .level = "debug"
    } else if match!(.message, r'Notice|NOTICE|^N[0-9]+|level=notice|Value:notice|"level":"notice"|<notice>') {
      .level = "notice"
    } else if match!(.message, r'Alert|ALERT|^A[0-9]+|level=alert|Value:alert|"level":"alert"|<alert>') {
      .level = "alert"
    } else if match!(.message, r'Emergency|EMERGENCY|^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"|<emergency>') {
      .level = "emergency"
    } else if match!(.message, r'(?i)\b(?:info)\b|^I[0-9]+|level=info|Value:info|"level":"info"|<info>') {
      .level = "info"
    }
  }
  pod_name = string!(.kubernetes.pod_name)
  if starts_with(pod_name, "eventrouter-") {
    parsed, err = parse_json(.message)
    if err != null {
      log("Unable to process EventRouter log: " + err, level: "info")
    } else {
      ., err = merge(.,parsed)
      if err == null && exists(.event) && is_object(.event) {
          if exists(.verb) {
            .event.verb = .verb
            del(.verb)
          }
          .kubernetes.event = del(.event)
          .message = del(.kubernetes.event.message)
          set!(., ["@timestamp"], .kubernetes.event.metadata.creationTimestamp)
          del(.kubernetes.event.metadata.creationTimestamp)
        . = compact(., nullish: true)
      } else {
        log("Unable to merge EventRouter log message into record: " + err, level: "info")
      }
    }
  }
  del(.source_type)
  del(.stream)
  del(.kubernetes.pod_ips)
  del(.kubernetes.node_labels)
  del(.timestamp_end)
  ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts}
'''

[transforms.drop_journal_logs]
type = "filter"
inputs = ["raw_journal_logs"]
condition = ".PRIORITY != \"7\" && .PRIORITY != 7"

[transforms.journal_logs]
type = "remap"
inputs = ["drop_journal_logs"]
source = '''
  .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}"
  
  .tag = ".journal.system"
  
  del(.source_type)
  del(._CPU_USAGE_NSEC)
  del(.__REALTIME_TIMESTAMP)
  del(.__MONOTONIC_TIMESTAMP)
  del(._SOURCE_REALTIME_TIMESTAMP)
  del(.JOB_RESULT)
  del(.JOB_TYPE)
  del(.TIMESTAMP_BOOTTIME)
  del(.TIMESTAMP_MONOTONIC)
  
  if .PRIORITY == "8" || .PRIORITY == 8 {
    .level = "trace"
  } else {
    priority = to_int!(.PRIORITY)
    .level, err = to_syslog_level(priority)
    if err != null {
        log("Unable to determine level from PRIORITY: " + err, level: "error")
        log(., level: "error")
        .level = "unknown"
    } else {
        del(.PRIORITY)
    }
  }
  
  .hostname = del(.host)
  
  # systemd’s kernel-specific metadata.
  # .systemd.k = {}
  if exists(.KERNEL_DEVICE) { .systemd.k.KERNEL_DEVICE = del(.KERNEL_DEVICE) }
  if exists(.KERNEL_SUBSYSTEM) { .systemd.k.KERNEL_SUBSYSTEM = del(.KERNEL_SUBSYSTEM) }
  if exists(.UDEV_DEVLINK) { .systemd.k.UDEV_DEVLINK = del(.UDEV_DEVLINK) }
  if exists(.UDEV_DEVNODE) { .systemd.k.UDEV_DEVNODE = del(.UDEV_DEVNODE) }
  if exists(.UDEV_SYSNAME) { .systemd.k.UDEV_SYSNAME = del(.UDEV_SYSNAME) }
  
  # trusted journal fields, fields that are implicitly added by the journal and cannot be altered by client code.
  .systemd.t = {}
  if exists(._AUDIT_LOGINUID) { .systemd.t.AUDIT_LOGINUID = del(._AUDIT_LOGINUID) }
  if exists(._BOOT_ID) { .systemd.t.BOOT_ID = del(._BOOT_ID) }
  if exists(._AUDIT_SESSION) { .systemd.t.AUDIT_SESSION = del(._AUDIT_SESSION) }
  if exists(._CAP_EFFECTIVE) { .systemd.t.CAP_EFFECTIVE = del(._CAP_EFFECTIVE) }
  if exists(._CMDLINE) { .systemd.t.CMDLINE = del(._CMDLINE) }
  if exists(._COMM) { .systemd.t.COMM = del(._COMM) }
  if exists(._EXE) { .systemd.t.EXE = del(._EXE) }
  if exists(._GID) { .systemd.t.GID = del(._GID) }
  if exists(._HOSTNAME) { .systemd.t.HOSTNAME = .hostname }
  if exists(._LINE_BREAK) { .systemd.t.LINE_BREAK = del(._LINE_BREAK) }
  if exists(._MACHINE_ID) { .systemd.t.MACHINE_ID = del(._MACHINE_ID) }
  if exists(._PID) { .systemd.t.PID = del(._PID) }
  if exists(._SELINUX_CONTEXT) { .systemd.t.SELINUX_CONTEXT = del(._SELINUX_CONTEXT) }
  if exists(._SOURCE_REALTIME_TIMESTAMP) { .systemd.t.SOURCE_REALTIME_TIMESTAMP = del(._SOURCE_REALTIME_TIMESTAMP) }
  if exists(._STREAM_ID) { .systemd.t.STREAM_ID = ._STREAM_ID }
  if exists(._SYSTEMD_CGROUP) { .systemd.t.SYSTEMD_CGROUP = del(._SYSTEMD_CGROUP) }
  if exists(._SYSTEMD_INVOCATION_ID) {.systemd.t.SYSTEMD_INVOCATION_ID = ._SYSTEMD_INVOCATION_ID}
  if exists(._SYSTEMD_OWNER_UID) { .systemd.t.SYSTEMD_OWNER_UID = del(._SYSTEMD_OWNER_UID) }
  if exists(._SYSTEMD_SESSION) { .systemd.t.SYSTEMD_SESSION = del(._SYSTEMD_SESSION) }
  if exists(._SYSTEMD_SLICE) { .systemd.t.SYSTEMD_SLICE = del(._SYSTEMD_SLICE) }
  if exists(._SYSTEMD_UNIT) { .systemd.t.SYSTEMD_UNIT = del(._SYSTEMD_UNIT) }
  if exists(._SYSTEMD_USER_UNIT) { .systemd.t.SYSTEMD_USER_UNIT = del(._SYSTEMD_USER_UNIT) }
  if exists(._TRANSPORT) { .systemd.t.TRANSPORT = del(._TRANSPORT) }
  if exists(._UID) { .systemd.t.UID = del(._UID) }
  
  # fields that are directly passed from clients and stored in the journal.
  .systemd.u = {}
  if exists(.CODE_FILE) { .systemd.u.CODE_FILE = del(.CODE_FILE) }
  if exists(.CODE_FUNC) { .systemd.u.CODE_FUNCTION = del(.CODE_FUNC) }
  if exists(.CODE_LINE) { .systemd.u.CODE_LINE = del(.CODE_LINE) }
  if exists(.ERRNO) { .systemd.u.ERRNO = del(.ERRNO) }
  if exists(.MESSAGE_ID) { .systemd.u.MESSAGE_ID = del(.MESSAGE_ID) }
  if exists(.SYSLOG_FACILITY) { .systemd.u.SYSLOG_FACILITY = del(.SYSLOG_FACILITY) }
  if exists(.SYSLOG_IDENTIFIER) { .systemd.u.SYSLOG_IDENTIFIER = del(.SYSLOG_IDENTIFIER) }
  if exists(.SYSLOG_PID) { .systemd.u.SYSLOG_PID = del(.SYSLOG_PID) }
  if exists(.RESULT) { .systemd.u.RESULT = del(.RESULT) }
  if exists(.UNIT) { .systemd.u.UNIT = del(.UNIT) }
  
  .time = format_timestamp!(.timestamp, format: "%FT%T%:z")
  
  ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts}
'''

[transforms.route_container_logs]
type = "route"
inputs = ["container_logs"]
route.app = '!((starts_with!(.kubernetes.namespace_name,"kube-")) || (starts_with!(.kubernetes.namespace_name,"openshift-")) || (.kubernetes.namespace_name == "default") || (.kubernetes.namespace_name == "openshift") || (.kubernetes.namespace_name == "kube"))'
route.infra = '(starts_with!(.kubernetes.namespace_name,"kube-")) || (starts_with!(.kubernetes.namespace_name,"openshift-")) || (.kubernetes.namespace_name == "default") || (.kubernetes.namespace_name == "openshift") || (.kubernetes.namespace_name == "kube")'

# Set log_type to "application"
[transforms.application]
type = "remap"
inputs = ["route_container_logs.app"]
source = '''
  .log_type = "application"
'''

# Set log_type to "infrastructure"
[transforms.infrastructure]
type = "remap"
inputs = ["route_container_logs.infra","journal_logs"]
source = '''
  .log_type = "infrastructure"
'''

[transforms.forward_to_ext_vector_http_user_defined]
type = "remap"
inputs = ["application","infrastructure"]
source = '''
  .
'''

[transforms.ext_vector_http_normalize_http]
type = "remap"
inputs = ["forward_to_ext_vector_http_user_defined"]
source = '''
  del(.file)
'''

[transforms.ext_vector_http_dedot]
type = "lua"
inputs = ["ext_vector_http_normalize_http"]
version = "2"
hooks.init = "init"
hooks.process = "process"
source = '''
    function init()
        count = 0
    end
    function process(event, emit)
        count = count + 1
        event.log.openshift.sequence = count
        if event.log.kubernetes == nil then
            emit(event)
            return
        end
        if event.log.kubernetes.labels == nil then
            emit(event)
            return
        end
        dedot(event.log.kubernetes.namespace_labels)
        dedot(event.log.kubernetes.labels)
        emit(event)
    end
    
    function dedot(map)
        if map == nil then
            return
        end
        local new_map = {}
        local changed_keys = {}
        for k, v in pairs(map) do
            local dedotted = string.gsub(k, "[./]", "_")
            if dedotted ~= k then
                new_map[dedotted] = v
                changed_keys[k] = true
            end
        end
        for k in pairs(changed_keys) do
            map[k] = nil
        end
        for k, v in pairs(new_map) do
            map[k] = v
        end
    end
'''

[sinks.ext_vector_http]
type = "http"
inputs = ["ext_vector_http_dedot"]
uri = "http://vector-http.tkimura-infra.svc:8080"
method = "post"

[sinks.ext_vector_http.encoding]
codec = "json"

[sinks.ext_vector_http.buffer]
when_full = "drop_newest"

[sinks.ext_vector_http.request]
retry_attempts = 17
timeout_secs = 10


# Bearer Auth Config
[sinks.ext_vector_http.auth]
strategy = "bearer"
token = "eyJhbGciOiJSUzI1NiIsImtpZCI6Im03RElRbTF6ZXdvU01mQ3lRcGZUUTZWUXRlVVJ1WHJtdWlVaUo0Y0tmSUUifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJvcGVuc2hpZnQtbG9nZ2luZyIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJsb2djb2xsZWN0b3ItdG9rZW4iLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoibG9nY29sbGVjdG9yIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQudWlkIjoiMjk1N2NjNDAtZTRjNS00YzMzLWE2MGQtMGFiZDU5ZGRjMGJlIiwic3ViIjoic3lzdGVtOnNlcnZpY2VhY2NvdW50Om9wZW5zaGlmdC1sb2dnaW5nOmxvZ2NvbGxlY3RvciJ9.OTVa3zjpdEUCPmQ2vru3UexUBsp63cb6wZWOLdM0cPKAYRj-clibbFa36WBV9OG__L0ujRVnPBBetkh1r9QwO4HpOPCi7owlcDWn_yOc3_tEVsWge5LZT28QEVm2vQh4n_sON1HRcwmnYKqJVMOStpD6WjIW9tIADai9RlW-d47iumC6j8eqXVF1zY-N09jkLAA3ZuBXqBpPmI4W24FZOlPmEAEGwE--H8dYxUjwheD12e1EntYBOjRpnUb9Sx86GqOnG1LIbfb3jv0MbndOFs7kIbus2Xc516OaHoFJfEoxl-SMxGdKcCOtm38S5ikpsfEC6LfcapTV0mg5JitpgOJI9QiUDwZ8ClOKk0z1JxNQQJcJPjg1zy0OjqZrSeIS4TqdzLsAUrA-tuu7HWpbKlwuNVNiLBqKw8N3JqTTEK3j0WeVvEYlBacZsyCBpfH8Vt8234r9ckbs67278tniUmQ5Y-4Y8ybEUjp6cIcs28T13DGfQmWOftlBgltTAmvpXAUCgBu6bTPXZLXCCmCs5VzTfK8W9NcVjOp-fmyIYVm2uBY1mSw5WXIP15I8FCsa22zzMXlGPU1IIDQuNQJCFvGQxeeWa0rZP5T_8wxjhi6tScJHh_XTKcx8sxKJHagYmIvYfF39oI0ZCo_pdCTI5EZ990JiFch5-dVv3zBzIyc"

[transforms.add_nodename_to_metric]
type = "remap"
inputs = ["internal_metrics"]
source = '''
.tags.hostname = get_env_var!("VECTOR_SELF_NODE_NAME")
'''

[sinks.prometheus_output]
type = "prometheus_exporter"
inputs = ["add_nodename_to_metric"]
address = "[::]:24231"
default_namespace = "collector"

[sinks.prometheus_output.tls]
enabled = true
key_file = "/etc/collector/metrics/tls.key"
crt_file = "/etc/collector/metrics/tls.crt"
min_tls_version = "VersionTLS12"
ciphersuites = "TLS_AES_128_GCM_SHA256,TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-CHACHA20-POLY1305,ECDHE-RSA-CHACHA20-POLY1305,DHE-RSA-AES128-GCM-SHA256,DHE-RSA-AES256-GCM-SHA384"

また、以前のFluentdではディスクバッファを利用する設定でしたが、Vectorではディスクを利用しないメモリバッファ設定となります。理由はいくつかあり、リソース消費が大きく遅い、遅いのでログが消失する、単に無駄、というものが大きな理由です。

ディスクバッファ設定では、ディスクバッファに一度書いてから転送してバッファ削除、という処理となります。入力がネットワーク受信したログなどであれば合理的な構成ですが、入力がディスク上のログなので、それを読み込んでもう一度ディスクに書き込むのは大きな無駄です。基本的には元となるコンテナログの保持サイズ、期間を伸ばせば済むところです。当然処理が多くなるので、パフォーマンスが低く、CPUやメモリの消費量が大きくなり、かつ転送処理が大幅に遅くなります。ログ出力レートが高いと転送が追いつかなくなり、バッファがフルになり入力がブロックされ(もしくはダイレクトにログ転送破棄が行われ)、その間にもログローテーションが発生すると転送されないログが発生してログ消失となります。ログ出力が多い、nodeあたりPod数が多い、などの条件で顕著にログ消失が発生してしまいます。言い換えると、Fluentd構成でログ消失をなるべく防ぎたい、という場合はノード一台あたりのログ出力レートを小さく保つ必要があるため、多数のコンテナが稼働する大きいサイズのノードを利用することができなくなります。

対してメモリバッファでは単純な強制終了で少量のログが転送されず消失するリスクがある、という点はありますが、上記の問題はすべて回避でき、また以前のFluentdの構成と比べて何十倍という単位で高速で動作するので、リスクとなる「転送処理中のログ」自体が非常に小さく保てます。Vectorにもdisk bufferオプションはありますが、500msインターバルでメモリを永続化する非同期のモデルとなっており、同期ではないので、ディスクバッファ設定を行ったとしても耐障害性ではメモリバッファとあまり大差はなさそうです。

たとえばもし強制終了が多く発生する、そのときにログ転送のログ消失は防止したい、といった特殊な前提条件があるのであれば、Vectorはその要件を満たすことはできなさそうですので、OpenShift LoggingのVectorではなく自前でログ収集エージェントを選定して設定したほうが良いでしょう。

検証していても、以前のFluentd設定では初期設定からログが収集されるまで小一時間待つ必要がありましたが、Vectorでは一瞬で全ログが転送されるのが便利です。

* 各記事は著者の見解によるものでありその所属組織を代表する公式なものではありません。その内容については非公式見解を含みます。