Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5081

Debezium - don't stop connector if topic don't exists

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Unresolved
    • Icon: Major Major
    • None
    • None
    • None
    • None
    • False
    • None
    • False

      What Debezium connector do you use and what version?

      1.9.0.FINAL

      What is the connector configuration?

      {    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",    "database.sslmode": "require",    "errors.log.enable": "true",    "errors.log.include.messages": "true",    "heartbeat.interval.ms": "60000",    "key.converter": "org.apache.kafka.connect.storage.StringConverter",    "key.converter.schemas.enable": "false",    "max.batch.size": "200",    "max.poll.records": "200",    "max.queue.size": "20240",    "plugin.name": "pgoutput",    "poll.interval.ms": "10",    "sanitize.field.names": "false",    "schema.include.list": "public",    "slot.drop.on.stop": "false",    "slot.name": "debezium_cdc_pgoutput",    "snapshot.mode": "initial",    "tasks.max": "1",    "tombstones.on.delete": "false",    "transforms": "outbox",    "transforms.outbox.route.by.field": "topic",    "transforms.outbox.route.tombstone.on.empty.payload": "false",    "transforms.outbox.route.topic.regex": "(?<routedByValue>.*)",    "transforms.outbox.route.topic.replacement": "${routedByValue}",    "transforms.outbox.table.field.event.id": "id",    "transforms.outbox.table.field.event.key": "customer_order_id",    "transforms.outbox.table.field.event.payload": "event",    "transforms.outbox.table.field.event.payload.id": "customer_order_id",    "transforms.outbox.table.field.event.type": "event_type",    "transforms.outbox.table.fields.additional.placement": "event_type:header:eventType,correlation_id:header:b3,bu_code:header:bu-code,app_source:header:app-source,x_datadog_trace_id:header:x-datadog-trace-id,x_datadog_parent_id:header:x-datadog-parent-id,x_datadog_sampling_priority:header:x-datadog-sampling-priority",    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",    "value.converter": "io.debezium.converters.ByteBufferConverter",    "value.converter.delegate.converter.type": "org.apache.kafka.connect.json.JsonConverter",    "value.converter.delegate.converter.type.schemas.enable": "false",    "value.converter.schemas.enable": "false",      "auto.create.topics.enable": "false",    "topic.creation.enable": "false",    "topic.creation.default.replication.factor": 1,    "topic.creation.default.partitions": 10,    "topic.creation.default.cleanup.policy": "compact",    "topic.creation.default.compression.type": "lz4"  }   

      What is the captured database version and mode of depoyment?

      postgres 12.10

      What behaviour do you expect?

      Don't stop the connector if a topic is not found, skeep the event and continue to send next events. 

      What behaviour do you see?

      The error is logged, and the consumer is stopped when the topic is not found and can not be created. 

      If I restart the consumer, the error occurs again and the connector is stopped.

      WorkerSourceTask{id=tempo-orchestrator-cdc-uat1-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
      
      WorkerSourceTask{id=tempo-orchestrator-cdc-uat1-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
       org.apache.kafka.connect.errors.ConnectException: Not authorized to describe topic(s) 'test' on the brokers -----
          at org.apache.kafka.connect.util.TopicAdmin.lambda$describeTopics$0(TopicAdmin.java:471)
          at java.base/java.util.HashMap.forEach(HashMap.java:1336)
          at org.apache.kafka.connect.util.TopicAdmin.describeTopics(TopicAdmin.java:459)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeCreateTopic(WorkerSourceTask.java:403)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:348)
          at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Topic authorization failed.
       

       

      Do you see the same behaviour using the latest relesead Debezium version?

      Yes

      How to reproduce the issue using our tutorial deployment?

      Hard to reproduce with a local env, I can't configure a kafka a disable topic creation.

      To reproduce, we should try to send an event on a non existing topic, and kafka should disable topic creation.

            Unassigned Unassigned
            deblockthomas62 Thomas Deblock (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated: