Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-9666

ConnectorContext toString() - should sensitive configuration be masked?

XMLWordPrintable

    • False
    • Hide

      None

      Show
      None
    • False

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      debezium-connector-mysql 3.2.2.Final

      What behavior do you see?

      Hi everyone,

       

      We noticed that the DebeziumOpenLineageEmitter.getEmitter() method logs the full ConnectorContext in exception messages, which could expose sensitive configuration data (passwords, secrets,authentication tokens) in application logs 

       

      Affected Code:

        // DebeziumOpenLineageEmitter.java
        private static LineageEmitter getEmitter(ConnectorContext connectorContext) {
            LineageEmitter emitter = emitters.get(connectorContext.toEmitterKey());
            LOGGER.debug("Available emitters {}", emitters);
            if (emitter == null) {
                throw new IllegalStateException("DebeziumOpenLineageEmitter not initialized for connector " + connectorContext + ". Call init() first.");
            }
            return emitter;
      }
       
      

        

      Stack Trace:

       java.lang.IllegalStateException: DebeziumOpenLineageEmitter not initialized for connector ConnectorContext[connectorLogicalName=my-connector, connectorName=mysql, taskId=0,
        version=3.2.2.Final, config={connector.class=io.debezium.connector.mysql.MySqlConnector, signal.consumer.sasl.mechanism=PLAIN,
        consumer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="testUser1" password="testPassword1";,
        schema.history.internal.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="testUser2" password="testPassword2";,
        database.password=testPassword3, …}]. Call init() first.
            at io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:158)
            at io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:108)
            at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:263)
            at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:291)
            at org.apache.kafka.connect.runtime.WorkerTask.doStart(WorkerTask.java:205)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:265)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:322)
            at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:83)
            at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:356)
            at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
            at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
            at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
            at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
            at java.base/java.lang.Thread.run(Thread.java:1583) 

       Security Risk:

       When the exception is thrown, connectorContext.toString() includes the full configuration map, potentially logging:

        - database.password

        - consumer.override.sasl.jaas.config

        - schema.registry.auth.client-secret

        - ssl.truststore.password

        - consumer.secret

        - basic.auth.user.info

       

       How to Reproduce:

        @Test
        public void testConnectorContextToStringLeaksSensitiveData() {
            Map<String, String> config = new HashMap<>();
            config.put("database.hostname", "localhost");
            config.put("database.password", "secretPassword123");
            config.put("consumer.override.sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pass123\";");
            config.put("schema.registry.auth.client-secret", "mySecret456");
       
            ConnectorContext context = new ConnectorContext("test-connector", "mysql", "0", "3.2.0.Final", config);
            String contextString = context.toString();
       
            // FAILS: Sensitive data is exposed in toString()
            assertFalse(contextString.contains("secretPassword123"));
            assertFalse(contextString.contains("pass123"));
            assertFalse(contextString.contains("mySecret456"));
        }
      

       

      Implementation ideas (optional)

      Possible solution:

      I'm not familiar with the codebase, but one approach could be to implement ConnectorContext.toString() to mask sensitive configuration keys matching a pattern similar to PASSWORD_PATTERN used in the Debezium core Configuration class here

       

      Pattern to match:

      .*secret$|.*password$|.*sasl\.jaas\.config$|.*basic\.auth\.user\.info|.*registry\.auth\.client-secret

       

       

      Example Implementation:

      // ConnectorContext.java
      
      private static final Pattern PASSWORD_PATTERN = Pattern.compile(
              ".*secret$|.*password$|.*sasl\\.jaas\\.config$|.*basic\\.auth\\.user\\.info|.*registry\\.auth\\.client-secret",
              Pattern.CASE_INSENSITIVE);
      
      ...
      
      @Override
      public String toString() {
          return "ConnectorContext[connectorLogicalName=" + connectorLogicalName +
                  ", connectorName=" + connectorName +
                  ", taskId=" + taskId +
                  ", version=" + version +
                  ", config=" + maskSensitiveConfig(config) +
                  "]";
      }
      
      private static Map<String, String> maskSensitiveConfig(Map<String, String> config) {
          if (config == null) {
              return null;
          }
          return config.entrySet().stream()
                  .collect(Collectors.toMap(
                          Map.Entry::getKey,
                          e -> PASSWORD_PATTERN.matcher(e.getKey()).matches() ? "********" : e.getValue()
                  ));
      }
      
       
      

       

              Unassigned Unassigned
              archied archie d
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated: