Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2573

Unknown configuration 'transforms.<name>.type'

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • None
    • 1.3.0.Beta2
    • mysql-connector
    • None
    • False
    • False
    • Undefined
    • Hide

      Create a payload.json file with:

      {
        "name": "cluster98",
        "config": {
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "database.hostname": "xx.xx.xx.xx",
          "database.port": "3306",
          "database.user": "debezium",
          "database.password": "xxxx",
          "database.server.name": "cluster98",
          "database.history.kafka.bootstrap.servers": "node1:9092,node2:9092,node3:9092",
          "database.history.kafka.topic": "debezium.cluster98",
          "include.schema.changes": "true",
          "database.exclude.list": "information_schema,mysql,mysql_innodb_cluster_metadata,performance_schema,sys,xxx",
          "table.exclude.list": "(.*)\\.migrations,(.*)\\.migrations_admin",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://node1:8081",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schema.registry.url": "http://node1:8081",
          "sanitize.field.names": "true",
          "transforms": "reroute",
          "transforms.reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
          "transforms.reroute.topic.regex": "cluster(.*)\\.(.*)\\.(.*)",
          "transforms.reroute.topic.replacement": "$2.$3",
          "topic.creation.default.replication.factor": 3,
          "topic.creation.default.partitions": 16,
          "topic.creation.default.cleanup.policy": "compact",
          "topic.creation.default.compression.type": "lz4"
        }
      }
      

      Replace placeholders with test values (IP addresses, passwords, etc).

      Then call the Kafka Connect REST API:

      curl -s -H "Accept: application/json" -H "Content-Type: application/json" -XPOST -d @payload.json http://localhost:8083/connectors | jq
      

      Check for connector status:

      curl -s -XGET http://localhost:8083/connectors/cluster98/status | jq
      
      Show
      Create a payload.json file with: { "name" : "cluster98" , "config" : { "connector.class" : "io.debezium.connector.mysql.MySqlConnector" , "database.hostname" : "xx.xx.xx.xx" , "database.port" : "3306" , "database.user" : "debezium" , "database.password" : "xxxx" , "database.server.name" : "cluster98" , "database.history.kafka.bootstrap.servers" : "node1:9092,node2:9092,node3:9092" , "database.history.kafka.topic" : "debezium.cluster98" , "include.schema.changes" : " true " , "database.exclude.list" : "information_schema,mysql,mysql_innodb_cluster_metadata,performance_schema,sys,xxx" , "table.exclude.list" : "(.*)\\.migrations,(.*)\\.migrations_admin" , "key.converter" : "io.confluent.connect.avro.AvroConverter" , "key.converter.schema.registry.url" : "http: //node1:8081" , "value.converter" : "io.confluent.connect.avro.AvroConverter" , "value.converter.schema.registry.url" : "http: //node1:8081" , "sanitize.field.names" : " true " , "transforms" : "reroute" , "transforms.reroute.type" : "io.debezium.transforms.ByLogicalTableRouter" , "transforms.reroute.topic.regex" : "cluster(.*)\\.(.*)\\.(.*)" , "transforms.reroute.topic.replacement" : "$2.$3" , "topic.creation. default .replication.factor" : 3, "topic.creation. default .partitions" : 16, "topic.creation. default .cleanup.policy" : "compact" , "topic.creation. default .compression.type" : "lz4" } } Replace placeholders with test values (IP addresses, passwords, etc). Then call the Kafka Connect REST API: curl -s -H "Accept: application/json" -H "Content-Type: application/json" -XPOST -d @payload.json http://localhost:8083/connectors | jq Check for connector status: curl -s -XGET http://localhost:8083/connectors/cluster98/status | jq

      On Kafka 2.6.0 and Debezium 1.3.0.Beta2, it is possible to auto create topics with replication factors, compression etc. But it seems to break the topic routing feature.

      The connector can be created but fails to start with the following trace:

      # curl -s -XGET http://localhost:8083/connectors/cluster98/status | jq -r ".tasks[].state, .tasks[].trace"
      FAILED
      org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.config.ConfigException: Unknown configuration 'transforms.reroute.type'
              at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:296)
              at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:605)
              at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:555)
              at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1251)
              at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1700(DistributedHerder.java:127)
              at org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1266)
              at org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1262)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:834)
      Caused by: org.apache.kafka.common.config.ConfigException: Unknown configuration 'transforms.reroute.type'
              at org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:159)
              at org.apache.kafka.connect.runtime.SourceConnectorConfig$EnrichedSourceConnectorConfig.get(SourceConnectorConfig.java:57)
              at org.apache.kafka.connect.runtime.SourceConnectorConfig.get(SourceConnectorConfig.java:141)
              at org.apache.kafka.common.config.AbstractConfig.getClass(AbstractConfig.java:216)
              at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:281)
              ... 10 more
      

      It works when we use either transforms or topic.creation.default settings but not both at the same time.

            Unassigned Unassigned
            julien.riou Julien Riou (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: