expire_metrics_secs = 60 data_dir = "/var/lib/vector/openshift-logging/my-custom-clf" [api] enabled = true # Load sensitive data from files [secret.kubernetes_secret] type = "file" base_path = "/var/run/ocp-collector/secrets" [sources.internal_metrics] type = "internal_metrics" # Logs from containers (including openshift containers) [sources.input_application_container] type = "kubernetes_logs" max_read_bytes = 3145728 glob_minimum_cooldown_ms = 15000 auto_partial_merge = true exclude_paths_glob_patterns = ["/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.log.*", "/var/log/pods/*/*/*.tmp", "/var/log/pods/default_*/*/*.log", "/var/log/pods/kube*_*/*/*.log", "/var/log/pods/openshift*_*/*/*.log"] pod_annotation_fields.pod_labels = "kubernetes.labels" pod_annotation_fields.pod_namespace = "kubernetes.namespace_name" pod_annotation_fields.pod_annotations = "kubernetes.annotations" pod_annotation_fields.pod_uid = "kubernetes.pod_id" pod_annotation_fields.pod_node_name = "hostname" namespace_annotation_fields.namespace_uid = "kubernetes.namespace_id" rotate_wait_secs = 5 [transforms.input_application_container_meta] type = "remap" inputs = ["input_application_container"] source = ''' .log_source = "container" .log_type = "application" ''' # Logs from host audit [sources.input_audit_host] type = "file" include = ["/var/log/audit/audit.log"] host_key = "hostname" glob_minimum_cooldown_ms = 15000 [transforms.input_audit_host_meta] type = "remap" inputs = ["input_audit_host"] source = ''' .log_source = "auditd" .log_type = "audit" ''' # Logs from kubernetes audit [sources.input_audit_kube] type = "file" include = ["/var/log/kube-apiserver/audit.log"] host_key = "hostname" glob_minimum_cooldown_ms = 15000 [transforms.input_audit_kube_meta] type = "remap" inputs = ["input_audit_kube"] source = ''' .log_source = "kubeAPI" .log_type = "audit" ''' # Logs from openshift audit [sources.input_audit_openshift] type = "file" include = ["/var/log/oauth-apiserver/audit.log","/var/log/openshift-apiserver/audit.log","/var/log/oauth-server/audit.log"] host_key = "hostname" glob_minimum_cooldown_ms = 15000 [transforms.input_audit_openshift_meta] type = "remap" inputs = ["input_audit_openshift"] source = ''' .log_source = "openshiftAPI" .log_type = "audit" ''' # Logs from ovn audit [sources.input_audit_ovn] type = "file" include = ["/var/log/ovn/acl-audit-log.log"] host_key = "hostname" glob_minimum_cooldown_ms = 15000 [transforms.input_audit_ovn_meta] type = "remap" inputs = ["input_audit_ovn"] source = ''' .log_source = "ovn" .log_type = "audit" ''' # Logs from containers (including openshift containers) [sources.input_infrastructure_container] type = "kubernetes_logs" max_read_bytes = 3145728 glob_minimum_cooldown_ms = 15000 auto_partial_merge = true include_paths_glob_patterns = ["/var/log/pods/default_*/*/*.log", "/var/log/pods/kube*_*/*/*.log", "/var/log/pods/openshift*_*/*/*.log"] exclude_paths_glob_patterns = ["/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.log.*", "/var/log/pods/*/*/*.tmp", "/var/log/pods/openshift-logging_*/gateway/*.log", "/var/log/pods/openshift-logging_*/loki*/*.log", "/var/log/pods/openshift-logging_*/opa/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log", "/var/log/pods/openshift-logging_logfilesmetricexporter-*/*/*.log"] pod_annotation_fields.pod_labels = "kubernetes.labels" pod_annotation_fields.pod_namespace = "kubernetes.namespace_name" pod_annotation_fields.pod_annotations = "kubernetes.annotations" pod_annotation_fields.pod_uid = "kubernetes.pod_id" pod_annotation_fields.pod_node_name = "hostname" namespace_annotation_fields.namespace_uid = "kubernetes.namespace_id" rotate_wait_secs = 5 [transforms.input_infrastructure_container_meta] type = "remap" inputs = ["input_infrastructure_container"] source = ''' .log_source = "container" .log_type = "infrastructure" ''' [sources.input_infrastructure_journal] type = "journald" journal_directory = "/var/log/journal" [transforms.input_infrastructure_journal_meta] type = "remap" inputs = ["input_infrastructure_journal"] source = ''' .log_source = "node" .log_type = "infrastructure" ''' [transforms.pipeline_test_app_viaq_0] type = "remap" inputs = ["input_application_container_meta"] source = ''' if .log_source == "container" { .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" if !exists(.level) { .level = "default" if match!(.message, r'Warning|WARN|^W[0-9]+|level=warn|Value:warn|"level":"warn"|') { .level = "warn" } else if match!(.message, r'Error|ERROR|^E[0-9]+|level=error|Value:error|"level":"error"|') { .level = "error" } else if match!(.message, r'Critical|CRITICAL|^C[0-9]+|level=critical|Value:critical|"level":"critical"|') { .level = "critical" } else if match!(.message, r'Debug|DEBUG|^D[0-9]+|level=debug|Value:debug|"level":"debug"|') { .level = "debug" } else if match!(.message, r'Notice|NOTICE|^N[0-9]+|level=notice|Value:notice|"level":"notice"|') { .level = "notice" } else if match!(.message, r'Alert|ALERT|^A[0-9]+|level=alert|Value:alert|"level":"alert"|') { .level = "alert" } else if match!(.message, r'Emergency|EMERGENCY|^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"|') { .level = "emergency" } else if match!(.message, r'(?i)\b(?:info)\b|^I[0-9]+|level=info|Value:info|"level":"info"|') { .level = "info" } } pod_name = string!(.kubernetes.pod_name) if starts_with(pod_name, "eventrouter-") { parsed, err = parse_json(.message) if err != null { log("Unable to process EventRouter log: " + err, level: "info") } else { ., err = merge(.,parsed) if err == null && exists(.event) && is_object(.event) { if exists(.verb) { .event.verb = .verb del(.verb) } .kubernetes.event = del(.event) .message = del(.kubernetes.event.message) . = set!(., ["@timestamp"], .kubernetes.event.metadata.creationTimestamp) del(.kubernetes.event.metadata.creationTimestamp) . = compact(., nullish: true) } else { log("Unable to merge EventRouter log message into record: " + err, level: "info") } } } del(._partial) del(.file) del(.source_type) del(.stream) del(.kubernetes.pod_ips) del(.kubernetes.node_labels) del(.timestamp_end) ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} .openshift.sequence = to_unix_timestamp(now(), unit: "nanoseconds") if exists(.kubernetes.namespace_labels) { for_each(object!(.kubernetes.namespace_labels)) -> |key,value| { newkey = replace(key, r'[\./]', "_") .kubernetes.namespace_labels = set!(.kubernetes.namespace_labels,[newkey],value) if newkey != key { .kubernetes.namespace_labels = remove!(.kubernetes.namespace_labels,[key],true) } } } if exists(.kubernetes.labels) { for_each(object!(.kubernetes.labels)) -> |key,value| { newkey = replace(key, r'[\./]', "_") .kubernetes.labels = set!(.kubernetes.labels,[newkey],value) if newkey != key { .kubernetes.labels = remove!(.kubernetes.labels,[key],true) } } } } ''' [transforms.pipeline_test_audit_viaq_0] type = "remap" inputs = ["input_audit_host_meta","input_audit_kube_meta","input_audit_openshift_meta","input_audit_ovn_meta"] source = ''' if .log_type == "audit" && .log_source == "auditd" { .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" ._internal.message = .message del(.file) del(.source_type) match1 = parse_regex(.message, r'type=(?P[^ ]+)') ?? {} envelop = {} envelop |= {"type": match1.type} match2, err = parse_regex(.message, r'msg=audit\((?P[^ ]+)\):') if err == null { sp, err = split(match2.ts_record,":") if err == null && length(sp) == 2 { ts = parse_timestamp(sp[0],"%s.%3f") ?? "" envelop |= {"record_id": sp[1]} . |= {"audit.linux" : envelop} . |= {"@timestamp" : format_timestamp(ts,"%+") ?? ""} } } else { log("could not parse host audit msg. err=" + err, rate_limit_secs: 0) } .level = "default" .hostname = get_env_var("VECTOR_SELF_NODE_NAME") ?? "" ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} } if .log_type == "audit" && .log_source == "kubeAPI" { .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" ._internal.message = .message del(.file) del(.source_type) . = merge(., parse_json!(string!(.message))) ?? . del(.message) .k8s_audit_level = .level .hostname = get_env_var("VECTOR_SELF_NODE_NAME") ?? "" ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} } if .log_type == "audit" && .log_source == "openshiftAPI" { .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" ._internal.message = .message del(.file) del(.source_type) . = merge(., parse_json!(string!(.message))) ?? . del(.message) .openshift_audit_level = .level .hostname = get_env_var("VECTOR_SELF_NODE_NAME") ?? "" ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} } if .log_type == "audit" && .log_source == "ovn" { .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" del(.file) del(.source_type) if !exists(.level) { .level = "default" if match!(.message, r'Warning|WARN|^W[0-9]+|level=warn|Value:warn|"level":"warn"|') { .level = "warn" } else if match!(.message, r'Error|ERROR|^E[0-9]+|level=error|Value:error|"level":"error"|') { .level = "error" } else if match!(.message, r'Critical|CRITICAL|^C[0-9]+|level=critical|Value:critical|"level":"critical"|') { .level = "critical" } else if match!(.message, r'Debug|DEBUG|^D[0-9]+|level=debug|Value:debug|"level":"debug"|') { .level = "debug" } else if match!(.message, r'Notice|NOTICE|^N[0-9]+|level=notice|Value:notice|"level":"notice"|') { .level = "notice" } else if match!(.message, r'Alert|ALERT|^A[0-9]+|level=alert|Value:alert|"level":"alert"|') { .level = "alert" } else if match!(.message, r'Emergency|EMERGENCY|^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"|') { .level = "emergency" } else if match!(.message, r'(?i)\b(?:info)\b|^I[0-9]+|level=info|Value:info|"level":"info"|') { .level = "info" } } .hostname = get_env_var("VECTOR_SELF_NODE_NAME") ?? "" ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} } ''' [transforms.pipeline_test_infra_viaqjournal_0] type = "filter" inputs = ["input_infrastructure_container_meta","input_infrastructure_journal_meta"] condition = ''' (.log_source == "node" && .PRIORITY != "7" && .PRIORITY != 7) || .log_source == "container" || .log_type == "audit" ''' [transforms.pipeline_test_infra_viaq_1] type = "remap" inputs = ["pipeline_test_infra_viaqjournal_0"] source = ''' if .log_source == "container" { .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" if !exists(.level) { .level = "default" if match!(.message, r'Warning|WARN|^W[0-9]+|level=warn|Value:warn|"level":"warn"|') { .level = "warn" } else if match!(.message, r'Error|ERROR|^E[0-9]+|level=error|Value:error|"level":"error"|') { .level = "error" } else if match!(.message, r'Critical|CRITICAL|^C[0-9]+|level=critical|Value:critical|"level":"critical"|') { .level = "critical" } else if match!(.message, r'Debug|DEBUG|^D[0-9]+|level=debug|Value:debug|"level":"debug"|') { .level = "debug" } else if match!(.message, r'Notice|NOTICE|^N[0-9]+|level=notice|Value:notice|"level":"notice"|') { .level = "notice" } else if match!(.message, r'Alert|ALERT|^A[0-9]+|level=alert|Value:alert|"level":"alert"|') { .level = "alert" } else if match!(.message, r'Emergency|EMERGENCY|^EM[0-9]+|level=emergency|Value:emergency|"level":"emergency"|') { .level = "emergency" } else if match!(.message, r'(?i)\b(?:info)\b|^I[0-9]+|level=info|Value:info|"level":"info"|') { .level = "info" } } pod_name = string!(.kubernetes.pod_name) if starts_with(pod_name, "eventrouter-") { parsed, err = parse_json(.message) if err != null { log("Unable to process EventRouter log: " + err, level: "info") } else { ., err = merge(.,parsed) if err == null && exists(.event) && is_object(.event) { if exists(.verb) { .event.verb = .verb del(.verb) } .kubernetes.event = del(.event) .message = del(.kubernetes.event.message) . = set!(., ["@timestamp"], .kubernetes.event.metadata.creationTimestamp) del(.kubernetes.event.metadata.creationTimestamp) . = compact(., nullish: true) } else { log("Unable to merge EventRouter log message into record: " + err, level: "info") } } } del(._partial) del(.file) del(.source_type) del(.stream) del(.kubernetes.pod_ips) del(.kubernetes.node_labels) del(.timestamp_end) ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} .openshift.sequence = to_unix_timestamp(now(), unit: "nanoseconds") if exists(.kubernetes.namespace_labels) { for_each(object!(.kubernetes.namespace_labels)) -> |key,value| { newkey = replace(key, r'[\./]', "_") .kubernetes.namespace_labels = set!(.kubernetes.namespace_labels,[newkey],value) if newkey != key { .kubernetes.namespace_labels = remove!(.kubernetes.namespace_labels,[key],true) } } } if exists(.kubernetes.labels) { for_each(object!(.kubernetes.labels)) -> |key,value| { newkey = replace(key, r'[\./]', "_") .kubernetes.labels = set!(.kubernetes.labels,[newkey],value) if newkey != key { .kubernetes.labels = remove!(.kubernetes.labels,[key],true) } } } } if .log_source == "node" { .openshift.cluster_id = "${OPENSHIFT_CLUSTER_ID:-}" .tag = ".journal.system" del(.source_type) del(._CPU_USAGE_NSEC) del(.__REALTIME_TIMESTAMP) del(.__MONOTONIC_TIMESTAMP) del(._SOURCE_REALTIME_TIMESTAMP) del(.JOB_RESULT) del(.JOB_TYPE) del(.TIMESTAMP_BOOTTIME) del(.TIMESTAMP_MONOTONIC) if .PRIORITY == "8" || .PRIORITY == 8 { .level = "trace" } else { priority = to_int!(.PRIORITY) .level, err = to_syslog_level(priority) if err != null { log("Unable to determine level from PRIORITY: " + err, level: "error") log(., level: "error") .level = "unknown" } else { del(.PRIORITY) } } .hostname = del(.host) # systemd’s kernel-specific metadata. # .systemd.k = {} if exists(.KERNEL_DEVICE) { .systemd.k.KERNEL_DEVICE = del(.KERNEL_DEVICE) } if exists(.KERNEL_SUBSYSTEM) { .systemd.k.KERNEL_SUBSYSTEM = del(.KERNEL_SUBSYSTEM) } if exists(.UDEV_DEVLINK) { .systemd.k.UDEV_DEVLINK = del(.UDEV_DEVLINK) } if exists(.UDEV_DEVNODE) { .systemd.k.UDEV_DEVNODE = del(.UDEV_DEVNODE) } if exists(.UDEV_SYSNAME) { .systemd.k.UDEV_SYSNAME = del(.UDEV_SYSNAME) } # trusted journal fields, fields that are implicitly added by the journal and cannot be altered by client code. .systemd.t = {} if exists(._AUDIT_LOGINUID) { .systemd.t.AUDIT_LOGINUID = del(._AUDIT_LOGINUID) } if exists(._BOOT_ID) { .systemd.t.BOOT_ID = del(._BOOT_ID) } if exists(._AUDIT_SESSION) { .systemd.t.AUDIT_SESSION = del(._AUDIT_SESSION) } if exists(._CAP_EFFECTIVE) { .systemd.t.CAP_EFFECTIVE = del(._CAP_EFFECTIVE) } if exists(._CMDLINE) { .systemd.t.CMDLINE = del(._CMDLINE) } if exists(._COMM) { .systemd.t.COMM = del(._COMM) } if exists(._EXE) { .systemd.t.EXE = del(._EXE) } if exists(._GID) { .systemd.t.GID = del(._GID) } if exists(._HOSTNAME) { .systemd.t.HOSTNAME = .hostname } if exists(._LINE_BREAK) { .systemd.t.LINE_BREAK = del(._LINE_BREAK) } if exists(._MACHINE_ID) { .systemd.t.MACHINE_ID = del(._MACHINE_ID) } if exists(._PID) { .systemd.t.PID = del(._PID) } if exists(._SELINUX_CONTEXT) { .systemd.t.SELINUX_CONTEXT = del(._SELINUX_CONTEXT) } if exists(._SOURCE_REALTIME_TIMESTAMP) { .systemd.t.SOURCE_REALTIME_TIMESTAMP = del(._SOURCE_REALTIME_TIMESTAMP) } if exists(._STREAM_ID) { .systemd.t.STREAM_ID = ._STREAM_ID } if exists(._SYSTEMD_CGROUP) { .systemd.t.SYSTEMD_CGROUP = del(._SYSTEMD_CGROUP) } if exists(._SYSTEMD_INVOCATION_ID) {.systemd.t.SYSTEMD_INVOCATION_ID = ._SYSTEMD_INVOCATION_ID} if exists(._SYSTEMD_OWNER_UID) { .systemd.t.SYSTEMD_OWNER_UID = del(._SYSTEMD_OWNER_UID) } if exists(._SYSTEMD_SESSION) { .systemd.t.SYSTEMD_SESSION = del(._SYSTEMD_SESSION) } if exists(._SYSTEMD_SLICE) { .systemd.t.SYSTEMD_SLICE = del(._SYSTEMD_SLICE) } if exists(._SYSTEMD_UNIT) { .systemd.t.SYSTEMD_UNIT = del(._SYSTEMD_UNIT) } if exists(._SYSTEMD_USER_UNIT) { .systemd.t.SYSTEMD_USER_UNIT = del(._SYSTEMD_USER_UNIT) } if exists(._TRANSPORT) { .systemd.t.TRANSPORT = del(._TRANSPORT) } if exists(._UID) { .systemd.t.UID = del(._UID) } # fields that are directly passed from clients and stored in the journal. .systemd.u = {} if exists(.CODE_FILE) { .systemd.u.CODE_FILE = del(.CODE_FILE) } if exists(.CODE_FUNC) { .systemd.u.CODE_FUNCTION = del(.CODE_FUNC) } if exists(.CODE_LINE) { .systemd.u.CODE_LINE = del(.CODE_LINE) } if exists(.ERRNO) { .systemd.u.ERRNO = del(.ERRNO) } if exists(.MESSAGE_ID) { .systemd.u.MESSAGE_ID = del(.MESSAGE_ID) } if exists(.SYSLOG_FACILITY) { .systemd.u.SYSLOG_FACILITY = del(.SYSLOG_FACILITY) } if exists(.SYSLOG_IDENTIFIER) { .systemd.u.SYSLOG_IDENTIFIER = del(.SYSLOG_IDENTIFIER) } if exists(.SYSLOG_PID) { .systemd.u.SYSLOG_PID = del(.SYSLOG_PID) } if exists(.RESULT) { .systemd.u.RESULT = del(.RESULT) } if exists(.UNIT) { .systemd.u.UNIT = del(.UNIT) } .time = format_timestamp!(.timestamp, format: "%FT%T%:z") ts = del(.timestamp); if !exists(."@timestamp") {."@timestamp" = ts} } ''' # Kafka Topic [transforms.output_kafka_app_topic] type = "remap" inputs = ["pipeline_test_app_viaq_0"] source = ''' ._internal.output_kafka_app_topic = "topic-logging-app" ''' [sinks.output_kafka_app] type = "kafka" inputs = ["output_kafka_app_topic"] bootstrap_servers = "my-cluster-kafka-bootstrap.amq-aosqe.svc:9092" topic = "{{ _internal.output_kafka_app_topic }}" [sinks.output_kafka_app.encoding] codec = "json" timestamp_format = "rfc3339" except_fields = ["_internal"] # Kafka Topic [transforms.output_kafka_audit_topic] type = "remap" inputs = ["pipeline_test_audit_viaq_0"] source = ''' ._internal.output_kafka_audit_topic = "topic-logging-audit" ''' [sinks.output_kafka_audit] type = "kafka" inputs = ["output_kafka_audit_topic"] bootstrap_servers = "my-cluster-kafka-bootstrap.amq-aosqe.svc:9092" topic = "{{ _internal.output_kafka_audit_topic }}" [sinks.output_kafka_audit.encoding] codec = "json" timestamp_format = "rfc3339" except_fields = ["_internal"] # Kafka Topic [transforms.output_kafka_infra_topic] type = "remap" inputs = ["pipeline_test_infra_viaq_1"] source = ''' ._internal.output_kafka_infra_topic = "topic-logging-infra" ''' [sinks.output_kafka_infra] type = "kafka" inputs = ["output_kafka_infra_topic"] bootstrap_servers = "my-cluster-kafka-bootstrap.amq-aosqe.svc:9092" topic = "{{ _internal.output_kafka_infra_topic }}" [sinks.output_kafka_infra.encoding] codec = "json" timestamp_format = "rfc3339" except_fields = ["_internal"] [transforms.add_nodename_to_metric] type = "remap" inputs = ["internal_metrics"] source = ''' .tags.hostname = get_env_var!("VECTOR_SELF_NODE_NAME") ''' [sinks.prometheus_output] type = "prometheus_exporter" inputs = ["add_nodename_to_metric"] address = "[::]:24231" default_namespace = "collector" [sinks.prometheus_output.tls] enabled = true key_file = "/etc/collector/metrics/tls.key" crt_file = "/etc/collector/metrics/tls.crt" min_tls_version = "VersionTLS12" ciphersuites = "TLS_AES_128_GCM_SHA256,TLS_AES_256_GCM_SHA384,TLS_CHACHA20_POLY1305_SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-CHACHA20-POLY1305,ECDHE-RSA-CHACHA20-POLY1305,DHE-RSA-AES128-GCM-SHA256,DHE-RSA-AES256-GCM-SHA384"