-
Enhancement
-
Resolution: Obsolete
-
Minor
-
None
-
False
-
-
False
In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
Postgres
What is the connector configuration?
{
"config":
{
"connector.class": "PostgresCdcSourceV2",
"name": "PostgresCdcSourceConnector_Kalla",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "XXXX",
"kafka.api.secret": "XXXX",
"database.hostname": "XXXXX",
"database.port": "5432",
"database.user": "XXXXXX",
"database.password": "XXXXX",
"database.dbname": "XXXXX",
"database.server.name": "testdbserver1",
"database.sslmode": "disable",
"table.include.list": "public.employee",
"publication.name": "debezium_partition",
"output.data.format": "JSON",
"output.key.format": "JSON",
"slot.name": "_slot",
"topic.prefix": "asgard",
"tasks.max": "1",
"transforms": "custom_unwrap",
"transforms.custom_unwrap.drop.tombstones": "false",
"transforms.custom_unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.custom_unwrap.delete.handling.mode": "jbsdfkjsd"
}
}
What is the captured database version and mode of deployment?
(E.g. on-premises, with a specific cloud provider, etc.)
<Your answer>
What behavior do you expect?
Expect validation on ENUM values while connector creation for transform configs.
What behavior do you see?
It creates the connector even with false values and fails post creation basically it does not do validation here
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L642 as we pass null values here https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/config/Field.java#L456-L475
and ConfigDef.Validator type is req
Do you see the same behaviour using the latest released Debezium version?
(Ideally, also verify with latest Alpha/Beta/CR version)
<Your answer>
Do you have the connector logs, ideally from start till finish?
(You might be asked later to provide DEBUG/TRACE level log)
org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.ConnectException: Unable to validate config.\n\tat org.apache.kafka.connect.runtime.ConnectorConfig.transformationStages(ConnectorConfig.java:345)\n\tat org.apache.kafka.connect.runtime.Worker$TaskBuilder.build(Worker.java:2138)\n\tat org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:785)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:870)\n\tat org.apache.kafka.connect.runtime.Worker.startSourceTask(Worker.java:682)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:2110)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$restartTask$29(DistributedHerder.java:1540)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2453)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:504)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:389)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Unable to validate config.\n\tat io.debezium.transforms.ExtractNewRecordState.configure(ExtractNewRecordState.java:131)\n\tat org.apache.kafka.connect.runtime.ConnectorConfig.transformationStages(ConnectorConfig.java:325)\n\t... 14 more\n
The 'delete.handling.mode' value '' is invalid: Value must be one of drop, none, rewrite
How to reproduce the issue using our tutorial deployment?
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
Which use case/requirement will be addressed by the proposed feature?
I am thinking of implementing the ConfigDef.Validator of kakfa library for EumRecommender validator in debezium basically I have implemented ensure Valid method so that it can be validate for transforms here
public static class EnumRecommender<T extends Enum<T>> implements Recommender, Validator, ConfigDef.Validator { private final List<Object> validValues; private final java.util.Set<String> literals; private final String literalsStr; private final String defaultOption; public EnumRecommender(Class<T> enumType, T defaultOption) { // Not all enums support EnumeratedValue yet this.literals = getEnumLiterals(enumType); this.validValues = Collections.unmodifiableList(new ArrayList<>(this.literals)); this.literalsStr = Strings.join(", ", validValues); this.defaultOption = defaultOption != null ? defaultOption.name().toLowerCase() : null; } @Override public List<Object> validValues(Field field, Configuration config) { return validValues; } @Override public boolean visible(Field field, Configuration config) { return true; } @Override public int validate(Configuration config, Field field, ValidationOutput problems) { String value = config.getString(field); if (value == null) { if (defaultOption != null) { problems.accept(field, value, "Value must be one of " + literalsStr); return 1; } return 0; } String trimmed = value.trim().toLowerCase(); if (!literals.contains(trimmed)) { problems.accept(field, value, "Value must be one of " + literalsStr); return 1; } return 0; } @Override public void ensureValid(String name, Object value) { if (value == null) { if (defaultOption != null) { throw new ConfigException(name, value, "Value must be one of " + literalsStr); } return; } String trimmed = value.toString().trim().toLowerCase(); if (!literals.contains(trimmed)) { throw new ConfigException(name, value, "Value must be one of " + literalsStr); } } }
Sample results

Implementation ideas (optional)
<Your answer>
