Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-9321

Transform Request Validations

XMLWordPrintable

    • False
    • Hide

      None

      Show
      None
    • False

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      Postgres

      What is the connector configuration?

      {
        "config":
        {
          "connector.class": "PostgresCdcSourceV2",
          "name": "PostgresCdcSourceConnector_Kalla",
          "kafka.auth.mode": "KAFKA_API_KEY",
          "kafka.api.key": "XXXX",
          "kafka.api.secret": "XXXX",
          "database.hostname": "XXXXX",
          "database.port": "5432",
          "database.user": "XXXXXX",
          "database.password": "XXXXX",
          "database.dbname": "XXXXX",
          "database.server.name": "testdbserver1",
          "database.sslmode": "disable",
          "table.include.list": "public.employee",
          "publication.name": "debezium_partition",
          "output.data.format": "JSON",
          "output.key.format": "JSON",
          "slot.name": "_slot",
          "topic.prefix": "asgard",
          "tasks.max": "1",
          "transforms": "custom_unwrap",
          "transforms.custom_unwrap.drop.tombstones": "false",
          "transforms.custom_unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
          "transforms.custom_unwrap.delete.handling.mode": "jbsdfkjsd"
        }
      } 

      What is the captured database version and mode of deployment?

      (E.g. on-premises, with a specific cloud provider, etc.)

      <Your answer>

      What behavior do you expect?

      Expect validation on ENUM values while connector creation for transform configs.

      What behavior do you see?

      It creates the connector even with false values and fails post creation basically it does not do validation here
      https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java#L642 as we pass null values here https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/config/Field.java#L456-L475

      and ConfigDef.Validator type is req

      Do you see the same behaviour using the latest released Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      <Your answer>

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.ConnectException: Unable to validate config.\n\tat org.apache.kafka.connect.runtime.ConnectorConfig.transformationStages(ConnectorConfig.java:345)\n\tat org.apache.kafka.connect.runtime.Worker$TaskBuilder.build(Worker.java:2138)\n\tat org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:785)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:870)\n\tat org.apache.kafka.connect.runtime.Worker.startSourceTask(Worker.java:682)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:2110)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$restartTask$29(DistributedHerder.java:1540)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2453)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:504)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:389)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Unable to validate config.\n\tat io.debezium.transforms.ExtractNewRecordState.configure(ExtractNewRecordState.java:131)\n\tat org.apache.kafka.connect.runtime.ConnectorConfig.transformationStages(ConnectorConfig.java:325)\n\t... 14 more\n

      The 'delete.handling.mode' value '' is invalid: Value must be one of drop, none, rewrite

      How to reproduce the issue using our tutorial deployment?

       

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      I am thinking of implementing the ConfigDef.Validator of kakfa library for EumRecommender validator in debezium basically I have implemented ensure Valid method so that it can be validate for transforms here 

      public static class EnumRecommender<T extends Enum<T>> implements Recommender, Validator, ConfigDef.Validator {        private final List<Object> validValues;
              private final java.util.Set<String> literals;
              private final String literalsStr;
              private final String defaultOption;
              public EnumRecommender(Class<T> enumType, T defaultOption) {
                  // Not all enums support EnumeratedValue yet
                  this.literals = getEnumLiterals(enumType);
                  this.validValues = Collections.unmodifiableList(new ArrayList<>(this.literals));
                  this.literalsStr = Strings.join(", ", validValues);
                  this.defaultOption = defaultOption != null ? defaultOption.name().toLowerCase() : null;
              }
              @Override
              public List<Object> validValues(Field field, Configuration config) {
                  return validValues;
              }
              @Override
              public boolean visible(Field field, Configuration config) {
                  return true;
              }
              @Override
              public int validate(Configuration config, Field field, ValidationOutput problems) {
                  String value = config.getString(field);
                  if (value == null) {
                      if (defaultOption != null) {
                          problems.accept(field, value, "Value must be one of " + literalsStr);
                          return 1;
                      }
                      return 0;
                  }
                  String trimmed = value.trim().toLowerCase();
                  if (!literals.contains(trimmed)) {
                      problems.accept(field, value, "Value must be one of " + literalsStr);
                      return 1;
                  }
                  return 0;
              }
              @Override
              public void ensureValid(String name, Object value) {
                  if (value == null) {
                      if (defaultOption != null) {
                          throw new ConfigException(name, value, "Value must be one of " + literalsStr);
                      }
                      return;
                  }
                  String trimmed = value.toString().trim().toLowerCase();
                  if (!literals.contains(trimmed)) {
                      throw new ConfigException(name, value, "Value must be one of " + literalsStr);
                  }
              }
          } 

      Sample results 

      Implementation ideas (optional)

      <Your answer>

              Unassigned Unassigned
              skalla Shubham Kalla (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: