-
Bug
-
Resolution: Done
-
Major
-
Logging 5.4.0
-
False
-
False
-
NEW
-
OBSDA-108 - Distribute an alternate Vector Log Collector
-
VERIFIED
-
-
Logging (Core) - Sprint 219, Logging (Core) - Sprint 220
Description of the problem:
Logs not being sent to Kafka with SASL plaintext when Vector is used as collector.
Steps to reproduce the issue.
1 Create a Kafka instance with SASL.
2 Create a ClusterLogForwarder to forward logs to Kafka with SASL plaintext.
cat 20_create-clf-kafka-sasl_plaintext.sh kafka_namespace=${KAFKA_NAMESPACE:-openshift-logging} kafka_user_name="admin" kafka_user_password="admin-secret"oc delete clf instance -n openshift-logging oc delete secret kafka-fluentd -n openshift-logging oc create secret generic kafka-fluentd --from-literal=username=${kafka_user_name} --from-literal=password=${kafka_user_password} -n openshift-logging cat <<EOF |oc create -f - apiVersion: logging.openshift.io/v1 kind: ClusterLogForwarder metadata: name: instance namespace: openshift-logging spec: outputs: - name: kafka-app url: tls://kafka.${kafka_namespace}.svc.cluster.local:9092/clo-topic type: kafka secret: name: kafka-fluentd pipelines: - name: test-app inputRefs: - application outputRefs: - kafka-app EOF
3 Create ClusterLogging instance with Vector as collector.
4 Check the Kafka consumer pod logs.
oc logs -f kafka-consumer-sals-plaintext-5cb6f47cb-fl5mb
5 Switch to Fluentd and check the Kafka consumer pods logs. The logs are collected.
Generated Vector config.
# Logs from containers (including openshift containers) [sources.raw_container_logs] type = "kubernetes_logs" auto_partial_merge = true exclude_paths_glob_patterns = ["/var/log/pods/openshift-logging_collector-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log"][transforms.container_logs] type = "remap" inputs = ["raw_container_logs"] source = ''' level = "unknown" if match(.message,r'(Warning|WARN|W[0-9]+|level=warn|Value:warn|"level":"warn")'){ level = "warn" } else if match(.message, r'Info|INFO|I[0-9]+|level=info|Value:info|"level":"info"'){ level = "info" } else if match(.message, r'Error|ERROR|E[0-9]+|level=error|Value:error|"level":"error"'){ level = "error" } else if match(.message, r'Debug|DEBUG|D[0-9]+|level=debug|Value:debug|"level":"debug"'){ level = "debug" } .level = level .pipeline_metadata.collector.name = "vector" .pipeline_metadata.collector.version = "0.14.1" ip4, err = get_env_var("NODE_IPV4") .pipeline_metadata.collector.ipaddr4 = ip4 received, err = format_timestamp(now(),"%+") .pipeline_metadata.collector.received_at = received .pipeline_metadata.collector.error = err ''' [transforms.route_container_logs] type = "route" inputs = ["container_logs"] route.app = '!(starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default")' # Rename log stream to "application" [transforms.application] type = "remap" inputs = ["route_container_logs.app"] source = """ .log_type = "application" """ [transforms.test-app] type = "remap" inputs = ["application"] source = """ . """# Kafka config [sinks.kafka_app] type = "kafka" inputs = ["test-app"] bootstrap_servers = "kafka.openshift-logging.svc.cluster.local:9092" topic = "clo-topic"[sinks.kafka_app.encoding] codec = "json" timestamp_format = "rfc3339"