-
Bug
-
Resolution: Done
-
Undefined
-
Logging 5.4.0
-
False
-
False
-
NEW
-
OBSDA-108 - Distribute an alternate Vector Log Collector
-
VERIFIED
-
-
Logging (Core) - Sprint 213
Description of problem:
When a ClusterLogForwarder is created to forward logs to Kafka, the collector pods fails to start due to Vector configuration error.
Steps to reproduce:
1 Deploy Cluster Logging instance with vector as collector.
apiVersion: "logging.openshift.io/v1" kind: "ClusterLogging" metadata: name: "instance" namespace: "openshift-logging" spec: managementState: "Managed" logStore: type: "elasticsearch" retentionPolicy: application: maxAge: 1d infra: maxAge: 7d audit: maxAge: 7d elasticsearch: nodeCount: 1 storage: {} resources: requests: memory: "1Gi" proxy: resources: limits: memory: 256Mi requests: memory: 256Mi redundancyPolicy: "ZeroRedundancy" visualization: type: "kibana" kibana: replicas: 1 collection: logs: type: "vector" vector: {}
2 Create a Cluster Log Forwarder instance to forward logs to a Kafka instance.
cat 20_create-clf-kafka-sasl_plaintext.sh kafka_namespace=${KAFKA_NAMESPACE:-openshift-logging} kafka_user_name="admin" kafka_user_password="admin-secret"oc delete clf instance -n openshift-logging oc delete secret kafka-fluentd -n openshift-logging oc create secret generic kafka-fluentd --from-literal=username=${kafka_user_name} --from-literal=password=${kafka_user_password} -n openshift-logging cat <<EOF |oc create -f - apiVersion: logging.openshift.io/v1 kind: ClusterLogForwarder metadata: name: instance namespace: openshift-logging spec: outputs: - name: kafka-app url: tls://kafka.${kafka_namespace}.svc.cluster.local:9092/clo-topic type: kafka secret: name: kafka-fluentd pipelines: - name: test-app inputRefs: - application outputRefs: - kafka-app EOF
3 Check the collector pods status and logs.
collector-pm87f 1/2 CrashLoopBackOff 3 (31s ago) 82s $ oc logs collector-pm87f -c collector Dec 15 09:21:07.268 INFO vector::app: Log level is enabled. level="debug" Dec 15 09:21:07.268 INFO vector::sources::host_metrics: PROCFS_ROOT is unset. Using default '/proc' for procfs root. Dec 15 09:21:07.268 INFO vector::sources::host_metrics: SYSFS_ROOT is unset. Using default '/sys' for sysfs root. Dec 15 09:21:07.268 INFO vector::app: Loading configs. path=[("/etc/vector/vector.toml", Some(Toml))] Dec 15 09:21:07.273 ERROR vector::cli: Configuration error. error=Input "raw_journal_logs" for transform "journal_logs" doesn't exist.
Generated vector.toml
cat vector.toml # Logs from containers (including openshift containers) [sources.raw_container_logs] type = "kubernetes_logs" auto_partial_merge = true exclude_paths_glob_patterns = ["/var/log/pods/openshift-logging_collector-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log"][transforms.container_logs] type = "remap" inputs = ["raw_container_logs"] source = ''' level = "unknown" if match(.message,r'(Warning|WARN|W[0-9]+|level=warn|Value:warn|"level":"warn")'){ level = "warn" } else if match(.message, r'Info|INFO|I[0-9]+|level=info|Value:info|"level":"info"'){ level = "info" } else if match(.message, r'Error|ERROR|E[0-9]+|level=error|Value:error|"level":"error"'){ level = "error" } else if match(.message, r'Debug|DEBUG|D[0-9]+|level=debug|Value:debug|"level":"debug"'){ level = "debug" } .level = level .pipeline_metadata.collector.name = "vector" .pipeline_metadata.collector.version = "0.14.1" ip4, err = get_env_var("NODE_IPV4") .pipeline_metadata.collector.ipaddr4 = ip4 received, err = format_timestamp(now(),"%+") .pipeline_metadata.collector.received_at = received .pipeline_metadata.collector.error = err '''[transforms.journal_logs] type = "remap" inputs = ["raw_journal_logs"] source = ''' level = "unknown" if match(.message,r'(Warning|WARN|W[0-9]+|level=warn|Value:warn|"level":"warn")'){ level = "warn" } else if match(.message, r'Info|INFO|I[0-9]+|level=info|Value:info|"level":"info"'){ level = "info" } else if match(.message, r'Error|ERROR|E[0-9]+|level=error|Value:error|"level":"error"'){ level = "error" } else if match(.message, r'Debug|DEBUG|D[0-9]+|level=debug|Value:debug|"level":"debug"'){ level = "debug" } .level = level .pipeline_metadata.collector.name = "vector" .pipeline_metadata.collector.version = "0.14.1" ip4, err = get_env_var("NODE_IPV4") .pipeline_metadata.collector.ipaddr4 = ip4 received, err = format_timestamp(now(),"%+") .pipeline_metadata.collector.received_at = received .pipeline_metadata.collector.error = err ''' [transforms.route_container_logs] type = "route" inputs = ["container_logs"] route.app = '!(starts_with!(.kubernetes.pod_namespace,"kube") && starts_with!(.kubernetes.pod_namespace,"openshift") && .kubernetes.pod_namespace == "default")' # Rename log stream to "application" [transforms.application] type = "remap" inputs = ["route_container_logs.app"] source = """ .log_type = "app" """ [transforms.test-app] type = "remap" inputs = ["application"] source = """ . """# Kafka config [sinks.kafka_app] type = "kafka" inputs = ["test-app"] bootstrap_servers = "kafka.openshift-logging.svc.cluster.local:9092" topic = "clo-topic"[sinks.kafka_app.encoding] codec = "json" timestamp_format = "rfc3339"