Uploaded image for project: 'OpenShift Logging'
  1. OpenShift Logging
  2. LOG-2072

[Vector] Collector pods fail to start when a ClusterLogForwarder instance is created to forward logs to multiple log stores.

XMLWordPrintable

    • Logging (Core) - Sprint 213

      Description of problem:

      When a ClusterLogForwarder instance is created to forward logs to multiple log stores, the collector pods fail to start with configuration error.

      Steps to reproduce:

      1 Create a Cluster Logging instance with Vector as collector.

      2 Create a ClusterLogForwarder to forward logs to multiple log stores.

      apiVersion: logging.openshift.io/v1
      kind: ClusterLogForwarder
      metadata:
        name: instance
        namespace: openshift-logging
      spec:
        outputs:
          - name: es-created-by-user
            type: elasticsearch
            url: 'http://elasticsearch-server.aosqe-es.svc:9200'
        pipelines:
          - name: forward-to-external-es
            inputRefs:
            - infrastructure
            - application
            - audit
            outputRefs:
            - es-created-by-user
            - default
       

      3 Check the collector pod status and logs.

      collector-rk2bk                                 1/2     CrashLoopBackOff   9 (48s ago)   21m
      
      oc logs collector-rk2bk -c collector
      Dec 16 07:20:07.536  INFO vector::app: Log level is enabled. level="debug"
      Dec 16 07:20:07.537  INFO vector::sources::host_metrics: PROCFS_ROOT is unset. Using default '/proc' for procfs root.
      Dec 16 07:20:07.537  INFO vector::sources::host_metrics: SYSFS_ROOT is unset. Using default '/sys' for sysfs root.
      Dec 16 07:20:07.537  INFO vector::app: Loading configs. path=[("/etc/vector/vector.toml", Some(Toml))]
      Dec 16 07:20:07.539 ERROR vector::cli: Configuration error. error=redefinition of table `transforms.elasticsearch_preprocess` for key `transforms.elasticsearch_preprocess` at line 139 column 1
       

      Generated vector config.

      cat vector.toml 
      # Logs from containers (including openshift containers)
      [sources.raw_container_logs]
      type = "kubernetes_logs"
      auto_partial_merge = true
      exclude_paths_glob_patterns = ["/var/log/pods/openshift-logging_collector-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log"][sources.raw_journal_logs]
      type = "journald"# Logs from host audit
      [sources.host_audit_logs]
      type = "file"
      ignore_older_secs = 600
      include = ["/var/log/audit/audit.log"]# Logs from kubernetes audit
      [sources.k8s_audit_logs]
      type = "file"
      ignore_older_secs = 600
      include = ["/var/log/kube-apiserver/audit.log"]# Logs from openshift audit
      [sources.openshift_audit_logs]
      type = "file"
      ignore_older_secs = 600
      include = ["/var/log/oauth-apiserver.audit.log"][transforms.container_logs]
      type = "remap"
      inputs = ["raw_container_logs"]
      source = '''
        level = "unknown"
        if match(.message,r'(Warning|WARN|W[0-9]+|level=warn|Value:warn|"level":"warn")'){
          level = "warn"
        } else if match(.message, r'Info|INFO|I[0-9]+|level=info|Value:info|"level":"info"'){
          level = "info"
        } else if match(.message, r'Error|ERROR|E[0-9]+|level=error|Value:error|"level":"error"'){
          level = "error"
        } else if match(.message, r'Debug|DEBUG|D[0-9]+|level=debug|Value:debug|"level":"debug"'){
          level = "debug"
        }
        .level = level  .pipeline_metadata.collector.name = "vector"
        .pipeline_metadata.collector.version = "0.14.1"
        ip4, err = get_env_var("NODE_IPV4")
        .pipeline_metadata.collector.ipaddr4 = ip4
        received, err = format_timestamp(now(),"%+")
        .pipeline_metadata.collector.received_at = received
        .pipeline_metadata.collector.error = err
       '''[transforms.journal_logs]
      type = "remap"
      inputs = ["raw_journal_logs"]
      source = '''
        level = "unknown"
        if match(.message,r'(Warning|WARN|W[0-9]+|level=warn|Value:warn|"level":"warn")'){
          level = "warn"
        } else if match(.message, r'Info|INFO|I[0-9]+|level=info|Value:info|"level":"info"'){
          level = "info"
        } else if match(.message, r'Error|ERROR|E[0-9]+|level=error|Value:error|"level":"error"'){
          level = "error"
        } else if match(.message, r'Debug|DEBUG|D[0-9]+|level=debug|Value:debug|"level":"debug"'){
          level = "debug"
        }
        .level = level  .pipeline_metadata.collector.name = "vector"
        .pipeline_metadata.collector.version = "0.14.1"
        ip4, err = get_env_var("NODE_IPV4")
        .pipeline_metadata.collector.ipaddr4 = ip4
        received, err = format_timestamp(now(),"%+")
        .pipeline_metadata.collector.received_at = received
        .pipeline_metadata.collector.error = err
       '''
      [transforms.route_container_logs]
      type = "route"
      inputs = ["container_logs"]
      route.app = '!(starts_with!(.kubernetes.pod_namespace,"kube") && starts_with!(.kubernetes.pod_namespace,"openshift") && .kubernetes.pod_namespace == "default")'
      route.infra = 'starts_with!(.kubernetes.pod_namespace,"kube") || starts_with!(.kubernetes.pod_namespace,"openshift") || .kubernetes.pod_namespace == "default"'
      # Rename log stream to "application"
      [transforms.application]
      type = "remap"
      inputs = ["route_container_logs.app"]
      source = """
      .log_type = "app"
      """
      # Rename log stream to "infrastructure"
      [transforms.infrastructure]
      type = "remap"
      inputs = ["route_container_logs.infra","journal_logs"]
      source = """
      .log_type = "infra"
      """
      # Rename log stream to "audit"
      [transforms.audit]
      type = "remap"
      inputs = ["host_audit_logs","k8s_audit_logs","openshift_audit_logs"]
      source = """
      .log_type = "audit"
      """
      [transforms.forward-to-external-es]
      type = "remap"
      inputs = ["application","audit","infrastructure"]
      source = """
      .
      """
      # Adding _id field
      [transforms.elasticsearch_preprocess]
      type = "remap"
      inputs = ["forward-to-external-es"]
      source = """
      ._id = encode_base64(uuid_v4())
      """[sinks.es_created_by_user]
      type = "elasticsearch"
      inputs = ["elasticsearch_preprocess"]
      endpoint = "http://elasticsearch-server.aosqe-es.svc:9200"
      index = "{{ log_type }}-write"
      request.timeout_secs = 2147483648
      bulk_action = "create"
      id_key = "_id"# Adding _id field
      [transforms.elasticsearch_preprocess]
      type = "remap"
      inputs = ["forward-to-external-es"]
      source = """
      ._id = encode_base64(uuid_v4())
      """[sinks.default]
      type = "elasticsearch"
      inputs = ["elasticsearch_preprocess"]
      endpoint = "https://elasticsearch.openshift-logging.svc:9200"
      index = "{{ log_type }}-write"
      request.timeout_secs = 2147483648
      bulk_action = "create"
      id_key = "_id"
      # TLS Config
      [sinks.default.tls]
      key_file = "/var/run/ocp-collector/secrets/collector/tls.key"
      crt_file = "/var/run/ocp-collector/secrets/collector/tls.crt"
      ca_file = "/var/run/ocp-collector/secrets/collector/ca-bundle.crt" 

              aguptaredhat Ajay Gupta (Inactive)
              rhn-support-ikanse Ishwar Kanse
              Ishwar Kanse Ishwar Kanse
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: