Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2003

Kafka database history topic already exists but connector fails

    XMLWordPrintable

Details

    • Bug
    • Status: Closed (View Workflow)
    • Major
    • Resolution: Duplicate Issue
    • 1.1.0.Final
    • None
    • core-library
    • None

    Description

      When using Debezium with a secured Kafka cluster with limited ACLs, the connector will fail even if the database history topic was correctly created beforehand.
      Limited ACLs:

      • No ACL on cluster resource.
      • All operations allowed on topic resource using prefixes or literal ACL.

      It appears that regardless of whether the database history topic exists or not, the code tries to get the Kafka Broker config to retrieve the default replication factor.
      See: https://github.com/debezium/debezium/blob/v1.1.0.Final/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java#L355
      https://github.com/debezium/debezium/blob/v1.1.0.Final/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java#L396

      It should be possible to start a connector with only ACLs on the topic resource for the specified topic in configuration database.history.kafka.topic.

      Full stack trace error:

      [2020-04-21 13:46:02,281] ERROR WorkerSourceTask{id=<redacted>} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
      org.apache.kafka.connect.errors.ConnectException: Creation of database history topic failed, please create the topic manually
          at io.debezium.relational.history.KafkaDatabaseHistory.initializeStorage(KafkaDatabaseHistory.java:365)
          at io.debezium.relational.HistorizedRelationalDatabaseSchema.initializeStorage(HistorizedRelationalDatabaseSchema.java:63)
          at io.debezium.connector.oracle.OracleConnectorTask.start(OracleConnectorTask.java:55)
          at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:104)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
          at java.lang.Thread.run(Thread.java:748)
      Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
          at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
          at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
          at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
          at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
          at io.debezium.relational.history.KafkaDatabaseHistory.getKafkaBrokerConfig(KafkaDatabaseHistory.java:403)
          at io.debezium.relational.history.KafkaDatabaseHistory.getDefaultTopicReplicationFactor(KafkaDatabaseHistory.java:371)
          at io.debezium.relational.history.KafkaDatabaseHistory.initializeStorage(KafkaDatabaseHistory.java:355)
          ... 11 more
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              doriangambin Dorian Gambin (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: