Uploaded image for project: 'OpenShift Logging'
  1. OpenShift Logging
  2. LOG-2070

[Vector] Collector pods fail to start when a ClusterLogForwarder is created to forward logs to Kafka.

XMLWordPrintable

    • Logging (Core) - Sprint 213

      Description of problem:

      When a ClusterLogForwarder is created to forward logs to Kafka, the collector pods fails to start due to Vector configuration error.

      Steps to reproduce:

      1 Deploy Cluster Logging instance with vector as collector.

      apiVersion: "logging.openshift.io/v1"
      kind: "ClusterLogging"
      metadata:
        name: "instance" 
        namespace: "openshift-logging"
      spec:
        managementState: "Managed"  
        logStore:
          type: "elasticsearch"  
          retentionPolicy: 
            application:
              maxAge: 1d
            infra:
              maxAge: 7d
            audit:
              maxAge: 7d
          elasticsearch:
            nodeCount: 1 
            storage: {}
            resources: 
                requests:
                  memory: "1Gi"
            proxy: 
              resources:
                limits:
                  memory: 256Mi
                requests:
                  memory: 256Mi
            redundancyPolicy: "ZeroRedundancy"
        visualization:
          type: "kibana"  
          kibana:
            replicas: 1
        collection:
          logs:
            type: "vector"  
            vector: {}
       

      2 Create a Cluster Log Forwarder instance to forward logs to a Kafka instance.

       cat 20_create-clf-kafka-sasl_plaintext.sh
      kafka_namespace=${KAFKA_NAMESPACE:-openshift-logging}
      kafka_user_name="admin"
      kafka_user_password="admin-secret"oc delete clf instance -n openshift-logging
      oc delete secret kafka-fluentd -n openshift-logging
      oc create secret generic kafka-fluentd --from-literal=username=${kafka_user_name} --from-literal=password=${kafka_user_password} -n openshift-logging
      cat <<EOF |oc create -f -
      apiVersion: logging.openshift.io/v1
      kind: ClusterLogForwarder
      metadata:
        name: instance
        namespace: openshift-logging
      spec:
        outputs:
          - name: kafka-app
            url: tls://kafka.${kafka_namespace}.svc.cluster.local:9092/clo-topic
            type: kafka
            secret:
              name: kafka-fluentd
        pipelines:
          - name: test-app
            inputRefs:
            - application
            outputRefs:
            - kafka-app
      EOF
      

      3 Check the collector pods status and logs.

      collector-pm87f                                 1/2     CrashLoopBackOff   3 (31s ago)   82s
      
      $ oc logs collector-pm87f -c collector
      Dec 15 09:21:07.268  INFO vector::app: Log level is enabled. level="debug"
      Dec 15 09:21:07.268  INFO vector::sources::host_metrics: PROCFS_ROOT is unset. Using default '/proc' for procfs root.
      Dec 15 09:21:07.268  INFO vector::sources::host_metrics: SYSFS_ROOT is unset. Using default '/sys' for sysfs root.
      Dec 15 09:21:07.268  INFO vector::app: Loading configs. path=[("/etc/vector/vector.toml", Some(Toml))]
      Dec 15 09:21:07.273 ERROR vector::cli: Configuration error. error=Input "raw_journal_logs" for transform "journal_logs" doesn't exist.
       

      Generated vector.toml

      cat vector.toml 
      # Logs from containers (including openshift containers)
      [sources.raw_container_logs]
      type = "kubernetes_logs"
      auto_partial_merge = true
      exclude_paths_glob_patterns = ["/var/log/pods/openshift-logging_collector-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log"][transforms.container_logs]
      type = "remap"
      inputs = ["raw_container_logs"]
      source = '''
        level = "unknown"
        if match(.message,r'(Warning|WARN|W[0-9]+|level=warn|Value:warn|"level":"warn")'){
          level = "warn"
        } else if match(.message, r'Info|INFO|I[0-9]+|level=info|Value:info|"level":"info"'){
          level = "info"
        } else if match(.message, r'Error|ERROR|E[0-9]+|level=error|Value:error|"level":"error"'){
          level = "error"
        } else if match(.message, r'Debug|DEBUG|D[0-9]+|level=debug|Value:debug|"level":"debug"'){
          level = "debug"
        }
        .level = level  .pipeline_metadata.collector.name = "vector"
        .pipeline_metadata.collector.version = "0.14.1"
        ip4, err = get_env_var("NODE_IPV4")
        .pipeline_metadata.collector.ipaddr4 = ip4
        received, err = format_timestamp(now(),"%+")
        .pipeline_metadata.collector.received_at = received
        .pipeline_metadata.collector.error = err
       '''[transforms.journal_logs]
      type = "remap"
      inputs = ["raw_journal_logs"]
      source = '''
        level = "unknown"
        if match(.message,r'(Warning|WARN|W[0-9]+|level=warn|Value:warn|"level":"warn")'){
          level = "warn"
        } else if match(.message, r'Info|INFO|I[0-9]+|level=info|Value:info|"level":"info"'){
          level = "info"
        } else if match(.message, r'Error|ERROR|E[0-9]+|level=error|Value:error|"level":"error"'){
          level = "error"
        } else if match(.message, r'Debug|DEBUG|D[0-9]+|level=debug|Value:debug|"level":"debug"'){
          level = "debug"
        }
        .level = level  .pipeline_metadata.collector.name = "vector"
        .pipeline_metadata.collector.version = "0.14.1"
        ip4, err = get_env_var("NODE_IPV4")
        .pipeline_metadata.collector.ipaddr4 = ip4
        received, err = format_timestamp(now(),"%+")
        .pipeline_metadata.collector.received_at = received
        .pipeline_metadata.collector.error = err
       '''
      [transforms.route_container_logs]
      type = "route"
      inputs = ["container_logs"]
      route.app = '!(starts_with!(.kubernetes.pod_namespace,"kube") && starts_with!(.kubernetes.pod_namespace,"openshift") && .kubernetes.pod_namespace == "default")'
      # Rename log stream to "application"
      [transforms.application]
      type = "remap"
      inputs = ["route_container_logs.app"]
      source = """
      .log_type = "app"
      """
      [transforms.test-app]
      type = "remap"
      inputs = ["application"]
      source = """
      .
      """# Kafka config
      [sinks.kafka_app]
      type = "kafka"
      inputs = ["test-app"]
      bootstrap_servers = "kafka.openshift-logging.svc.cluster.local:9092"
      topic = "clo-topic"[sinks.kafka_app.encoding]
      codec = "json"
      timestamp_format = "rfc3339" 

              aguptaredhat Ajay Gupta (Inactive)
              rhn-support-ikanse Ishwar Kanse
              Ishwar Kanse Ishwar Kanse
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: