id:nekop です。この記事はOpenShift Advent Calendar 2023のエントリです。
OpenShift Loggingでは長くEFK (Elasticsearch, Fluentd, Kibana)スタックが利用されていましたが、現在のリリースはLoki Vectorスタックを標準的に利用するようになっています。Elasticsearchは2022-07に非推奨、Fluentdは2023-01に非推奨となりました。
OpenShift LoggingではVectorによるログ収集と転送がサポートされています。VectorはDatadog社により開発されたデータ転送処理ソフトウェアです。Sourceと呼ばれる入力、Transformerによる加工、Sinkと呼ばれる出力の3つのコンポーネントを利用してデータの転送処理を行います。
今回はこのVectorにフォーカスして遊んでみます。通常構成のログストアであるLokiは利用せず、クラスタ内に受信側となる別のVector podを用意してそちらにログ転送してファイル出力を行う設定で構成します。この構成における受信側Vectorはサポートされる構成ではなくユーザアプリ扱いなので、サポート対象外となります。
受信側Vectorはhttp_server sourceで8080ポート待ち受け、file sinkでファイル出力する設定です。/tmp配下のファイルとして保存します。
TARGET_PROJECT=tkimura-infra cat <<EOF > vector.toml expire_metrics_secs = 60 [api] enabled = true [sources.http_server_source] type = "http_server" address = "0.0.0.0:8080" decoding.codec = "json" [sinks.file_sink] type = "file" inputs = [ "http_server_source" ] path = "/tmp/vector-%Y-%m-%d.log" encoding.codec = "json" EOF oc -n $TARGET_PROJECT create configmap vector-http --from-file=vector.toml oc -n $TARGET_PROJECT create deployment vector-http --image=quay.io/openshift-logging/vector:5.8 --port=8080 oc -n $TARGET_PROJECT set volume deploy/vector-http --add --name=vector-http-config --configmap-name=vector-http --mount-path=/etc/vector oc -n $TARGET_PROJECT expose deploy/vector-http oc -n openshift-logging create -f - <<EOF apiVersion: logging.openshift.io/v1 kind: ClusterLogging metadata: name: instance namespace: openshift-logging spec: collection: type: vector --- apiVersion: logging.openshift.io/v1 kind: ClusterLogForwarder metadata: name: instance namespace: openshift-logging spec: outputs: - name: ext-vector-http type: http url: 'http://vector-http.$TARGET_PROJECT.svc:8080' pipelines: - name: forward-to-ext-vector-http inputRefs: - application - infrastructure outputRefs: - ext-vector-http EOF
デプロイすると、受信側Vectorで転送されたログを確認することができます。
$ oc -n tkimura-infra rsh deploy/vector-http ls -la /tmp total 131968 drwxrwxrwt. 1 root root 35 Dec 22 07:44 . dr-xr-xr-x. 1 root root 39 Dec 22 07:44 .. -rw-r--r--. 1 1000710000 root 101867631 Dec 22 08:22 vector-2023-12-22.log $ oc -n tkimura-infra rsh deploy/vector-http tail -n2 /tmp/vector-2023-12-22.log {"@timestamp":"2023-12-22T07:57:53.432429470Z","hostname":"ip-10-0-45-201.ap-northeast-1.compute.internal","kubernetes":{"annotations":{"openshift.io/scc":"hostaccess"},"container_id":"cri-o://465432752c76d4aff0419119d2365fbbf38cee44620ff746c974380df12cbeef","container_image":"quay.io/openshift-release-dev/ocp-release@sha256:e5128c3b0ab225e0abf9344dae504e08b82dda4885bbd047e2dbc13cc3d9879b","container_name":"cluster-version-operator","labels":{"k8s-app":"cluster-version-operator","pod-template-hash":"678d8bb44"},"namespace_id":"e0c35f25-5a1f-4682-bfa2-d69dab7418fc","namespace_labels":{"kubernetes_io_metadata_name":"openshift-cluster-version","name":"openshift-cluster-version","olm_operatorgroup_uid_32e3efbd-dac1-4364-bab3-cd37df669a1e":"","openshift_io_cluster-monitoring":"true","openshift_io_run-level":"","pod-security_kubernetes_io_audit":"privileged","pod-security_kubernetes_io_enforce":"privileged","pod-security_kubernetes_io_warn":"privileged"},"namespace_name":"openshift-cluster-version","pod_id":"4b2b3ab9-297a-4480-8857-d80244289da1","pod_ip":"10.0.45.201","pod_name":"cluster-version-operator-678d8bb44-s8c7x","pod_owner":"ReplicaSet/cluster-version-operator-678d8bb44"},"level":"info","log_type":"infrastructure","message":"I1222 07:57:53.432384 1 sync_worker.go:990] Done syncing for cloudcredential \"cluster\" (269 of 860)","openshift":{"cluster_id":"1e0f1eb9-9174-426b-8e2e-6b401fca7fbe","sequence":29292},"path":"/","source_type":"http_server","timestamp":"2023-12-22T07:57:53.454795108Z"} {"@timestamp":"2023-12-22T07:57:53.432429470Z","hostname":"ip-10-0-45-201.ap-northeast-1.compute.internal","kubernetes":{"annotations":{"openshift.io/scc":"hostaccess"},"container_id":"cri-o://465432752c76d4aff0419119d2365fbbf38cee44620ff746c974380df12cbeef","container_image":"quay.io/openshift-release-dev/ocp-release@sha256:e5128c3b0ab225e0abf9344dae504e08b82dda4885bbd047e2dbc13cc3d9879b","container_name":"cluster-version-operator","labels":{"k8s-app":"cluster-version-operator","pod-template-hash":"678d8bb44"},"namespace_id":"e0c35f25-5a1f-4682-bfa2-d69dab7418fc","namespace_labels":{"kubernetes_io_metadata_name":"openshift-cluster-version","name":"openshift-cluster-version","olm_operatorgroup_uid_32e3efbd-dac1-4364-bab3-cd37df669a1e":"","openshift_io_cluster-monitoring":"true","openshift_io_run-level":"","pod-security_kubernetes_io_audit":"privileged","pod-security_kubernetes_io_enforce":"privileged","pod-security_kubernetes_io_warn":"privileged"},"namespace_name":"openshift-cluster-version","pod_id":"4b2b3ab9-297a-4480-8857-d80244289da1","pod_ip":"10.0.45.201","pod_name":"cluster-version-operator-678d8bb44-s8c7x","pod_owner":"ReplicaSet/cluster-version-operator-678d8bb44"},"level":"info","log_type":"infrastructure","message":"I1222 07:57:53.432416 1 sync_worker.go:975] Running sync for rolebinding \"openshift-cloud-credential-operator/cloud-credential-operator\" (270 of 860)","openshift":{"cluster_id":"1e0f1eb9-9174-426b-8e2e-6b401fca7fbe","sequence":29293},"path":"/","source_type":"http_server","timestamp":"2023-12-22T07:57:53.454795108Z"}
設定ファイルを見てみましょう。vector.tomlは大まかに以下の設定になっています。
- kubernetes_logs source
- journald source
- 基本的な加工
- openshift, kube namespaceのログとjournaldをinfrastructureに分類、その他をapplicationに分類
- 受信側vectorへのhttp sink
$ oc -n openshift-logging extract secret/collector-config vector.toml run-vector.sh $ cat vector.toml expire_metrics_secs = 60 [api] enabled = true # Logs from containers (including openshift containers) [sources.raw_container_logs] type = "kubernetes_logs" max_read_bytes = 3145728 glob_minimum_cooldown_ms = 15000 auto_partial_merge = true exclude_paths_glob_patterns = ["/var/log/pods/openshift-logging_logfilesmetricexporter-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_*/loki*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log", "/var/log/pods/openshift-logging_*/gateway/*.log", "/var/log/pods/openshift-logging_*/opa/*.log", "/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.tmp"] pod_annotation_fields.pod_labels = "kubernetes.labels" pod_annotation_fields.pod_namespace = "kubernetes.namespace_name" pod_annotation_fields.pod_annotations = "kubernetes.annotations" pod_annotation_fields.pod_uid = "kubernetes.pod_id" pod_annotation_fields.pod_node_name = "hostname" namespace_annotation_fields.namespace_uid = "kubernetes.namespace_id" rotate_wait_ms = 5000 [sources.raw_journal_logs] type = "journald" journal_directory = "/var/log/journal" [sources.internal_metrics] type = "internal_metrics" [transforms.container_logs] type = "remap" inputs = ["raw_container_logs"] source = ''' .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" if !exists(.level) { .level = "default" if match!(.message, r'Warning|WARN|^W[0-9]+|level=warn|Value:warn|"level":"warn"|<warn>') { .level = "warn" } else if match!(.message, r'Error|ERROR|^E[0-9]+|level=error|Value:error|"level":"error"|<error>') { .level = "error" } else if match!(.message, r'Critical|CRITICAL|^C[0-9]+|level=critical|Value:critical|"level":"critical"|<critical>') { .level = "critical" } else if match!(.message, r'Debug|DEBUG|^D[0-9]+|level=debug|Value:debug|"level":"debug"|<debug>') { .level = "debug" } else if match!(.message, r'Notice|NOTICE|^N[0-9]+|level=notice|Value:notice|"level":"notice"|<notice>') { .level = "notice" } else if match!(.message, r'Alert|ALERT|^A[0-9]+|level=alert|Value:alert|"level":"alert"|<alert>') { .level = "alert" } else if match!(.message, r'Emergency|EMERGENCY|^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"|<emergency>') { .level = "emergency" } else if match!(.message, r'(?i)\b(?:info)\b|^I[0-9]+|level=info|Value:info|"level":"info"|<info>') { .level = "info" } } pod_name = string!(.kubernetes.pod_name) if starts_with(pod_name, "eventrouter-") { parsed, err = parse_json(.message) if err != null { log("Unable to process EventRouter log: " + err, level: "info") } else { ., err = merge(.,parsed) if err == null && exists(.event) && is_object(.event) { if exists(.verb) { .event.verb = .verb del(.verb) } .kubernetes.event = del(.event) .message = del(.kubernetes.event.message) set!(., ["@timestamp"], .kubernetes.event.metadata.creationTimestamp) del(.kubernetes.event.metadata.creationTimestamp) . = compact(., nullish: true) } else { log("Unable to merge EventRouter log message into record: " + err, level: "info") } } } del(.source_type) del(.stream) del(.kubernetes.pod_ips) del(.kubernetes.node_labels) del(.timestamp_end) ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} ''' [transforms.drop_journal_logs] type = "filter" inputs = ["raw_journal_logs"] condition = ".PRIORITY != \"7\" && .PRIORITY != 7" [transforms.journal_logs] type = "remap" inputs = ["drop_journal_logs"] source = ''' .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" .tag = ".journal.system" del(.source_type) del(._CPU_USAGE_NSEC) del(.__REALTIME_TIMESTAMP) del(.__MONOTONIC_TIMESTAMP) del(._SOURCE_REALTIME_TIMESTAMP) del(.JOB_RESULT) del(.JOB_TYPE) del(.TIMESTAMP_BOOTTIME) del(.TIMESTAMP_MONOTONIC) if .PRIORITY == "8" || .PRIORITY == 8 { .level = "trace" } else { priority = to_int!(.PRIORITY) .level, err = to_syslog_level(priority) if err != null { log("Unable to determine level from PRIORITY: " + err, level: "error") log(., level: "error") .level = "unknown" } else { del(.PRIORITY) } } .hostname = del(.host) # systemd’s kernel-specific metadata. # .systemd.k = {} if exists(.KERNEL_DEVICE) { .systemd.k.KERNEL_DEVICE = del(.KERNEL_DEVICE) } if exists(.KERNEL_SUBSYSTEM) { .systemd.k.KERNEL_SUBSYSTEM = del(.KERNEL_SUBSYSTEM) } if exists(.UDEV_DEVLINK) { .systemd.k.UDEV_DEVLINK = del(.UDEV_DEVLINK) } if exists(.UDEV_DEVNODE) { .systemd.k.UDEV_DEVNODE = del(.UDEV_DEVNODE) } if exists(.UDEV_SYSNAME) { .systemd.k.UDEV_SYSNAME = del(.UDEV_SYSNAME) } # trusted journal fields, fields that are implicitly added by the journal and cannot be altered by client code. .systemd.t = {} if exists(._AUDIT_LOGINUID) { .systemd.t.AUDIT_LOGINUID = del(._AUDIT_LOGINUID) } if exists(._BOOT_ID) { .systemd.t.BOOT_ID = del(._BOOT_ID) } if exists(._AUDIT_SESSION) { .systemd.t.AUDIT_SESSION = del(._AUDIT_SESSION) } if exists(._CAP_EFFECTIVE) { .systemd.t.CAP_EFFECTIVE = del(._CAP_EFFECTIVE) } if exists(._CMDLINE) { .systemd.t.CMDLINE = del(._CMDLINE) } if exists(._COMM) { .systemd.t.COMM = del(._COMM) } if exists(._EXE) { .systemd.t.EXE = del(._EXE) } if exists(._GID) { .systemd.t.GID = del(._GID) } if exists(._HOSTNAME) { .systemd.t.HOSTNAME = .hostname } if exists(._LINE_BREAK) { .systemd.t.LINE_BREAK = del(._LINE_BREAK) } if exists(._MACHINE_ID) { .systemd.t.MACHINE_ID = del(._MACHINE_ID) } if exists(._PID) { .systemd.t.PID = del(._PID) } if exists(._SELINUX_CONTEXT) { .systemd.t.SELINUX_CONTEXT = del(._SELINUX_CONTEXT) } if exists(._SOURCE_REALTIME_TIMESTAMP) { .systemd.t.SOURCE_REALTIME_TIMESTAMP = del(._SOURCE_REALTIME_TIMESTAMP) } if exists(._STREAM_ID) { .systemd.t.STREAM_ID = ._STREAM_ID } if exists(._SYSTEMD_CGROUP) { .systemd.t.SYSTEMD_CGROUP = del(._SYSTEMD_CGROUP) } if exists(._SYSTEMD_INVOCATION_ID) {.systemd.t.SYSTEMD_INVOCATION_ID = ._SYSTEMD_INVOCATION_ID} if exists(._SYSTEMD_OWNER_UID) { .systemd.t.SYSTEMD_OWNER_UID = del(._SYSTEMD_OWNER_UID) } if exists(._SYSTEMD_SESSION) { .systemd.t.SYSTEMD_SESSION = del(._SYSTEMD_SESSION) } if exists(._SYSTEMD_SLICE) { .systemd.t.SYSTEMD_SLICE = del(._SYSTEMD_SLICE) } if exists(._SYSTEMD_UNIT) { .systemd.t.SYSTEMD_UNIT = del(._SYSTEMD_UNIT) } if exists(._SYSTEMD_USER_UNIT) { .systemd.t.SYSTEMD_USER_UNIT = del(._SYSTEMD_USER_UNIT) } if exists(._TRANSPORT) { .systemd.t.TRANSPORT = del(._TRANSPORT) } if exists(._UID) { .systemd.t.UID = del(._UID) } # fields that are directly passed from clients and stored in the journal. .systemd.u = {} if exists(.CODE_FILE) { .systemd.u.CODE_FILE = del(.CODE_FILE) } if exists(.CODE_FUNC) { .systemd.u.CODE_FUNCTION = del(.CODE_FUNC) } if exists(.CODE_LINE) { .systemd.u.CODE_LINE = del(.CODE_LINE) } if exists(.ERRNO) { .systemd.u.ERRNO = del(.ERRNO) } if exists(.MESSAGE_ID) { .systemd.u.MESSAGE_ID = del(.MESSAGE_ID) } if exists(.SYSLOG_FACILITY) { .systemd.u.SYSLOG_FACILITY = del(.SYSLOG_FACILITY) } if exists(.SYSLOG_IDENTIFIER) { .systemd.u.SYSLOG_IDENTIFIER = del(.SYSLOG_IDENTIFIER) } if exists(.SYSLOG_PID) { .systemd.u.SYSLOG_PID = del(.SYSLOG_PID) } if exists(.RESULT) { .systemd.u.RESULT = del(.RESULT) } if exists(.UNIT) { .systemd.u.UNIT = del(.UNIT) } .time = format_timestamp!(.timestamp, format: "%FT%T%:z") ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} ''' [transforms.route_container_logs] type = "route" inputs = ["container_logs"] route.app = '!((starts_with!(.kubernetes.namespace_name,"kube-")) || (starts_with!(.kubernetes.namespace_name,"openshift-")) || (.kubernetes.namespace_name == "default") || (.kubernetes.namespace_name == "openshift") || (.kubernetes.namespace_name == "kube"))' route.infra = '(starts_with!(.kubernetes.namespace_name,"kube-")) || (starts_with!(.kubernetes.namespace_name,"openshift-")) || (.kubernetes.namespace_name == "default") || (.kubernetes.namespace_name == "openshift") || (.kubernetes.namespace_name == "kube")' # Set log_type to "application" [transforms.application] type = "remap" inputs = ["route_container_logs.app"] source = ''' .log_type = "application" ''' # Set log_type to "infrastructure" [transforms.infrastructure] type = "remap" inputs = ["route_container_logs.infra","journal_logs"] source = ''' .log_type = "infrastructure" ''' [transforms.forward_to_ext_vector_http_user_defined] type = "remap" inputs = ["application","infrastructure"] source = ''' . ''' [transforms.ext_vector_http_normalize_http] type = "remap" inputs = ["forward_to_ext_vector_http_user_defined"] source = ''' del(.file) ''' [transforms.ext_vector_http_dedot] type = "lua" inputs = ["ext_vector_http_normalize_http"] version = "2" hooks.init = "init" hooks.process = "process" source = ''' function init() count = 0 end function process(event, emit) count = count + 1 event.log.openshift.sequence = count if event.log.kubernetes == nil then emit(event) return end if event.log.kubernetes.labels == nil then emit(event) return end dedot(event.log.kubernetes.namespace_labels) dedot(event.log.kubernetes.labels) emit(event) end function dedot(map) if map == nil then return end local new_map = {} local changed_keys = {} for k, v in pairs(map) do local dedotted = string.gsub(k, "[./]", "_") if dedotted ~= k then new_map[dedotted] = v changed_keys[k] = true end end for k in pairs(changed_keys) do map[k] = nil end for k, v in pairs(new_map) do map[k] = v end end ''' [sinks.ext_vector_http] type = "http" inputs = ["ext_vector_http_dedot"] uri = "http://vector-http.tkimura-infra.svc:8080" method = "post" [sinks.ext_vector_http.encoding] codec = "json" [sinks.ext_vector_http.buffer] when_full = "drop_newest" [sinks.ext_vector_http.request] retry_attempts = 17 timeout_secs = 10 # Bearer Auth Config [sinks.ext_vector_http.auth] strategy = "bearer" token = "eyJhbGciOiJSUzI1NiIsImtpZCI6Im03RElRbTF6ZXdvU01mQ3lRcGZUUTZWUXRlVVJ1WHJtdWlVaUo0Y0tmSUUifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJvcGVuc2hpZnQtbG9nZ2luZyIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJsb2djb2xsZWN0b3ItdG9rZW4iLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC5uYW1lIjoibG9nY29sbGVjdG9yIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQudWlkIjoiMjk1N2NjNDAtZTRjNS00YzMzLWE2MGQtMGFiZDU5ZGRjMGJlIiwic3ViIjoic3lzdGVtOnNlcnZpY2VhY2NvdW50Om9wZW5zaGlmdC1sb2dnaW5nOmxvZ2NvbGxlY3RvciJ9.OTVa3zjpdEUCPmQ2vru3UexUBsp63cb6wZWOLdM0cPKAYRj-clibbFa36WBV9OG__L0ujRVnPBBetkh1r9QwO4HpOPCi7owlcDWn_yOc3_tEVsWge5LZT28QEVm2vQh4n_sON1HRcwmnYKqJVMOStpD6WjIW9tIADai9RlW-d47iumC6j8eqXVF1zY-N09jkLAA3ZuBXqBpPmI4W24FZOlPmEAEGwE--H8dYxUjwheD12e1EntYBOjRpnUb9Sx86GqOnG1LIbfb3jv0MbndOFs7kIbus2Xc516OaHoFJfEoxl-SMxGdKcCOtm38S5ikpsfEC6LfcapTV0mg5JitpgOJI9QiUDwZ8ClOKk0z1JxNQQJcJPjg1zy0OjqZrSeIS4TqdzLsAUrA-tuu7HWpbKlwuNVNiLBqKw8N3JqTTEK3j0WeVvEYlBacZsyCBpfH8Vt8234r9ckbs67278tniUmQ5Y-4Y8ybEUjp6cIcs28T13DGfQmWOftlBgltTAmvpXAUCgBu6bTPXZLXCCmCs5VzTfK8W9NcVjOp-fmyIYVm2uBY1mSw5WXIP15I8FCsa22zzMXlGPU1IIDQuNQJCFvGQxeeWa0rZP5T_8wxjhi6tScJHh_XTKcx8sxKJHagYmIvYfF39oI0ZCo_pdCTI5EZ990JiFch5-dVv3zBzIyc" [transforms.add_nodename_to_metric] type = "remap" inputs = ["internal_metrics"] source = ''' .tags.hostname = get_env_var!("VECTOR_SELF_NODE_NAME") ''' [sinks.prometheus_output] type = "prometheus_exporter" inputs = ["add_nodename_to_metric"] address = "[::]:24231" default_namespace = "collector" [sinks.prometheus_output.tls] enabled = true key_file = "/etc/collector/metrics/tls.key" crt_file = "/etc/collector/metrics/tls.crt" min_tls_version = "VersionTLS12" ciphersuites = "TLS_AES_128_GCM_SHA256,TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-CHACHA20-POLY1305,ECDHE-RSA-CHACHA20-POLY1305,DHE-RSA-AES128-GCM-SHA256,DHE-RSA-AES256-GCM-SHA384"
また、以前のFluentdではディスクバッファを利用する設定でしたが、Vectorではディスクを利用しないメモリバッファ設定となります。理由はいくつかあり、リソース消費が大きく遅い、遅いのでログが消失する、単に無駄、というものが大きな理由です。
ディスクバッファ設定では、ディスクバッファに一度書いてから転送してバッファ削除、という処理となります。入力がネットワーク受信したログなどであれば合理的な構成ですが、入力がディスク上のログなので、それを読み込んでもう一度ディスクに書き込むのは大きな無駄です。基本的には元となるコンテナログの保持サイズ、期間を伸ばせば済むところです。当然処理が多くなるので、パフォーマンスが低く、CPUやメモリの消費量が大きくなり、かつ転送処理が大幅に遅くなります。ログ出力レートが高いと転送が追いつかなくなり、バッファがフルになり入力がブロックされ(もしくはダイレクトにログ転送破棄が行われ)、その間にもログローテーションが発生すると転送されないログが発生してログ消失となります。ログ出力が多い、nodeあたりPod数が多い、などの条件で顕著にログ消失が発生してしまいます。言い換えると、Fluentd構成でログ消失をなるべく防ぎたい、という場合はノード一台あたりのログ出力レートを小さく保つ必要があるため、多数のコンテナが稼働する大きいサイズのノードを利用することができなくなります。
対してメモリバッファでは単純な強制終了で少量のログが転送されず消失するリスクがある、という点はありますが、上記の問題はすべて回避でき、また以前のFluentdの構成と比べて何十倍という単位で高速で動作するので、リスクとなる「転送処理中のログ」自体が非常に小さく保てます。Vectorにもdisk bufferオプションはありますが、500msインターバルでメモリを永続化する非同期のモデルとなっており、同期ではないので、ディスクバッファ設定を行ったとしても耐障害性ではメモリバッファとあまり大差はなさそうです。
たとえばもし強制終了が多く発生する、そのときにログ転送のログ消失は防止したい、といった特殊な前提条件があるのであれば、Vectorはその要件を満たすことはできなさそうですので、OpenShift LoggingのVectorではなく自前でログ収集エージェントを選定して設定したほうが良いでしょう。
検証していても、以前のFluentd設定では初期設定からログが収集されるまで小一時間待つ必要がありましたが、Vectorでは一瞬で全ログが転送されるのが便利です。