-
Bug
-
Resolution: Unresolved
-
Major
-
None
-
None
-
False
-
-
False
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
debezium-connector-mysql 3.2.2.Final
What behavior do you see?
Hi everyone,
We noticed that the DebeziumOpenLineageEmitter.getEmitter() method logs the full ConnectorContext in exception messages, which could expose sensitive configuration data (passwords, secrets,authentication tokens) in application logs
Affected Code:
// DebeziumOpenLineageEmitter.java private static LineageEmitter getEmitter(ConnectorContext connectorContext) { LineageEmitter emitter = emitters.get(connectorContext.toEmitterKey()); LOGGER.debug("Available emitters {}", emitters); if (emitter == null) { throw new IllegalStateException("DebeziumOpenLineageEmitter not initialized for connector " + connectorContext + ". Call init() first."); } return emitter; }
Stack Trace:
java.lang.IllegalStateException: DebeziumOpenLineageEmitter not initialized for connector ConnectorContext[connectorLogicalName=my-connector, connectorName=mysql, taskId=0, version=3.2.2.Final, config={connector.class=io.debezium.connector.mysql.MySqlConnector, signal.consumer.sasl.mechanism=PLAIN, consumer.override.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="testUser1" password="testPassword1";, schema.history.internal.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="testUser2" password="testPassword2";, database.password=testPassword3, …}]. Call init() first. at io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:158) at io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:108) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:263) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:291) at org.apache.kafka.connect.runtime.WorkerTask.doStart(WorkerTask.java:205) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:265) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:322) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:83) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:356) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583)
Security Risk:
When the exception is thrown, connectorContext.toString() includes the full configuration map, potentially logging:
- database.password
- consumer.override.sasl.jaas.config
- schema.registry.auth.client-secret
- ssl.truststore.password
- consumer.secret
- basic.auth.user.info
How to Reproduce:
@Test public void testConnectorContextToStringLeaksSensitiveData() { Map<String, String> config = new HashMap<>(); config.put("database.hostname", "localhost"); config.put("database.password", "secretPassword123"); config.put("consumer.override.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pass123\";"); config.put("schema.registry.auth.client-secret", "mySecret456"); ConnectorContext context = new ConnectorContext("test-connector", "mysql", "0", "3.2.0.Final", config); String contextString = context.toString(); // FAILS: Sensitive data is exposed in toString() assertFalse(contextString.contains("secretPassword123")); assertFalse(contextString.contains("pass123")); assertFalse(contextString.contains("mySecret456")); }
Implementation ideas (optional)
Possible solution:
I'm not familiar with the codebase, but one approach could be to implement ConnectorContext.toString() to mask sensitive configuration keys matching a pattern similar to PASSWORD_PATTERN used in the Debezium core Configuration class here
Pattern to match:
.*secret$|.*password$|.*sasl\.jaas\.config$|.*basic\.auth\.user\.info|.*registry\.auth\.client-secret
Example Implementation:
// ConnectorContext.java private static final Pattern PASSWORD_PATTERN = Pattern.compile( ".*secret$|.*password$|.*sasl\\.jaas\\.config$|.*basic\\.auth\\.user\\.info|.*registry\\.auth\\.client-secret", Pattern.CASE_INSENSITIVE); ... @Override public String toString() { return "ConnectorContext[connectorLogicalName=" + connectorLogicalName + ", connectorName=" + connectorName + ", taskId=" + taskId + ", version=" + version + ", config=" + maskSensitiveConfig(config) + "]"; } private static Map<String, String> maskSensitiveConfig(Map<String, String> config) { if (config == null) { return null; } return config.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, e -> PASSWORD_PATTERN.matcher(e.getKey()).matches() ? "********" : e.getValue() )); }
- incorporates
-
DBZ-9555 OpenLineage config not working with composite, and filter out debezium_config openlineage kafka message for security reasons
-
- Open
-