Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2992

Connector configuration error with transform and create topics options

    XMLWordPrintable

Details

    • Bug
    • Resolution: Duplicate
    • Major
    • None
    • None
    • None
    • None
    • False
    • False
    • 1.4
    • Undefined
    • Hide

      Apply connect configuration from example to running debezium/connect:1.4

      Show
      Apply connect configuration from example to running debezium/connect:1.4

    Description

      Connector configuration with "transform" and "topic.creation" options failed.
      REST returns OK, when configration applyed, but in debezium logs exeption appears and transforms doesn't work:

      org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.config.ConfigException: Unknown configuration 'transforms.route.type'
              at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:296)
              at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:605)
              at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:555)
              at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1251)
              at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1700(DistributedHerder.java:127)
              at org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1266)
              at org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1262)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:834)
      Caused by: org.apache.kafka.common.config.ConfigException: Unknown configuration 'transforms.route.type'
              at org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:159)
              at org.apache.kafka.connect.runtime.SourceConnectorConfig$EnrichedSourceConnectorConfig.get(SourceConnectorConfig.java:57)
              at org.apache.kafka.connect.runtime.SourceConnectorConfig.get(SourceConnectorConfig.java:141)
              at org.apache.kafka.common.config.AbstractConfig.getClass(AbstractConfig.java:216)
              at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:281)
              ... 10 more
      

      Configuration example:

      {
          "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
          "plugin.name": "pgoutput",
          "database.hostname": "db",
          "database.port": "5432",
          "database.user": "postgres",
          "database.password": "postgres",
          "database.dbname" : "my-db",
          "database.server.name": "my",
          "slot.name": "dbz_my",
          "publication.name": "dbz_publication",
          "table.include.list": "events.transactions",
        
          "tasks.max": "1",
          "value.converter.schemas.enable": "false",
          "key.converter.schemas.enable": "false",
          "tombstones.on.delete": "false",
          "heartbeat.interval.ms": "30000",
        
          "message.key.columns": "events.transactions:trx_id",
        
          "transforms": "route,createKey,createValue",
        
          "transforms.route.type": "io.debezium.transforms.ContentBasedRouter",
          "transforms.route.language": "jsr223.groovy",
          "transforms.route.topic.expression": "\"$value.source.name-$value.after.model_name\".toString()",
          "transforms.route.topic.regex": "^.*\\.(events\\.transactions)$",
          
          "transforms.createKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
          "transforms.createKey.field":"trx_id",
          "transforms.createKey.predicate":"myTopics",
        
          "transforms.createValue.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
          "transforms.createValue.field":"after",
          "transforms.createValue.predicate":"myTopics",
        
          "predicates": "myTopics",
          "predicates.myTopics.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
          "predicates.myTopics.pattern": "^my-.*$",
        
          "topic.creation.groups": "events",
          "topic.creation.default.replication.factor": "1",
          "topic.creation.default.partitions": "3",
          "topic.creation.events.replication.factor": "1",
          "topic.creation.events.partitions": "3"
        }
      

      Same configuration without topic.creation. or "transforms. block applies normally.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              alecx.sitex Aleksey Dergachev (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: