Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1770

CDC connector crash when it try to push message on a topics without good permission

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Won't Do
    • Icon: Major Major
    • None
    • 1.0.0.Final
    • postgresql-connector
    • None

      We have an issue when the debezium connector try to push a message on a topic.
      If the topic doesn't exists, or if debezium can not access to the topic, it crash and new change will not be send.

      We have the issue using `outboxEventRouter` and dynamic topics name.

      The connector configuration is :

      {
        "name": "connector",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "tasks.max": "1",
            "database.hostname": "postgres",
            "database.port": "5432",
            "database.user": "postgres",
            "database.password": "mysecretpassword",
            "database.dbname" : "postgres",
            "database.server.name": "db",
            "table.whitelist": "event",
            "plugin.name": "wal2json",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter": "io.debezium.converters.ByteBufferConverter",
            "key.converter.schemas.enable":"false",
            "value.converter.schemas.enable": "false",
            "poll.interval.ms": 300,
            "transforms" : "outbox",
            "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
            "transforms.outbox.table.field.event.payload": "event",
            "transforms.outbox.table.field.event.payload.id": "id",
            "transforms.outbox.table.field.event.type": "event_type",
            "transforms.outbox.table.field.event.key": "id",
            "transforms.outbox.route.by.field": "topic",
            "transforms.outbox.route.topic.regex": "(?<routedByValue>.*)",
            "transforms.outbox.route.topic.replacement": "${routedByValue}",
            "transforms.outbox.table.fields.additional.placement": "event_type:header:eventType,correlation_id:header:b3"
        }
      }
      
      

      stack trace sample:

      | 2020-02-04 13:49:28,277 ERROR  \|\|  [Producer clientId=connector-producer-tempo-orchestrator-cdc-uat-0] Topic authorization failed for topics [technical-event]   [org.apache.kafka.clients.Metadata]
        | 2020-02-04 13:49:28,278 ERROR  \|\|  WorkerSourceTask{id=tempo-orchestrator-cdc-uat-0} failed to send record to technical-event:   [org.apache.kafka.connect.runtime.WorkerSourceTask]
        | org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [technical-event]
        | 2020-02-04 13:49:28,283 INFO   \|\|  WorkerSourceTask{id=tempo-orchestrator-cdc-uat-0} Committing offsets   [org.apache.kafka.connect.runtime.WorkerSourceTask]
        | 2020-02-04 13:49:28,283 INFO   \|\|  WorkerSourceTask{id=tempo-orchestrator-cdc-uat-0} flushing 1 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]
        | 2020-02-04 13:49:33,284 ERROR  \|\|  WorkerSourceTask{id=tempo-orchestrator-cdc-uat-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages   [org.apache.kafka.connect.runtime.WorkerSourceTask]
        | 2020-02-04 13:49:33,284 ERROR  \|\|  WorkerSourceTask{id=tempo-orchestrator-cdc-uat-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
        | org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
        | at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:258)
        | at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:312)
        | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
        | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        | at java.base/java.lang.Thread.run(Thread.java:834)
        | Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [technical-event]
        | 2020-02-04 13:49:33,285 ERROR  \|\|  WorkerSourceTask{id=tempo-orchestrator-cdc-uat-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
        | 2020-02-04 13:49:33,286 INFO   Postgres\|adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-CONNECT\|postgres-connector-task  Finished streaming   [io.debezium.pipeline.ChangeEventSourceCoordinator]
        | 2020-02-04 13:49:33,286 INFO   Postgres\|adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-CONNECT\|postgres-connector-task  Connected metrics set to 'false'   [io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics]
        | 2020-02-04 13:49:33,289 INFO   \|\|  [Producer clientId=connector-producer-tempo-orchestrator-cdc-uat-0] Closing the Kafka producer with timeoutMillis = 30000 ms.   [org.apache.kafka.clients.producer.KafkaProducer]
        | 2020-02-04 13:49:33,289 ERROR  \|\|  [Producer clientId=connector-producer-tempo-orchestrator-cdc-uat-0] Interrupted while joining ioThread   [org.apache.kafka.clients.producer.KafkaProducer]
        | java.lang.InterruptedException
        | at java.base/java.lang.Object.wait(Native Method)
        | at java.base/java.lang.Thread.join(Thread.java:1313)
        | at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1199)
        | at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1176)
        | at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:158)
        | at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:156)
        | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:183)
        | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        | at java.base/java.lang.Thread.run(Thread.java:834)
        | 2020-02-04 13:49:33,289 INFO   \|\|  [Producer clientId=connector-producer-tempo-orchestrator-cdc-uat-0] Proceeding to force close the producer since pending requests could not be completed within timeout 30000 ms.   [org.apache.kafka.clients.producer.KafkaProducer]
        | 2020-02-04 13:49:33,290 WARN   \|\|  Could not close producer   [org.apache.kafka.connect.runtime.WorkerSourceTask]
        | org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
        | at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1201)
        | at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1176)
        | at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:158)
        | at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:156)
        | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:183)
        | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        | at java.base/java.lang.Thread.run(Thread.java:834)
        | Caused by: java.lang.InterruptedException
        | at java.base/java.lang.Object.wait(Native Method)
        | at java.base/java.lang.Thread.join(Thread.java:1313)
        | at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1199)
        | ... 10 more
        | 2020-02-04 13:50:23,666 INFO   \|\|  Kafka Connect stopping   [org.apache.kafka.connect.runtime.Connect]
        | 2020-02-04 13:50:23,666 INFO   \|\|  Stopping REST server   [org.apache.kafka.connect.runtime.rest.RestServer]
        | 2020-02-04 13:50:23,672 INFO   \|\|  Stopped http_10.131.19.368083@c808207{HTTP/1.1,[http/1.1]}{10.131.19.36:8083}   [org.eclipse.jetty.server.AbstractConnector]
        | 2020-02-04 13:50:23,672 INFO   \|\|  node0 Stopped scavenging   [org.eclipse.jetty.server.session]
        | 2020-02-04 13:50:23,675 INFO   \|\|  REST server stopped   [org.apache.kafka.connect.runtime.rest.RestServer]
        | 2020-02-04 13:50:23,675 INFO   \|\|  [Worker clientId=connect-1, groupId=svc-tempo-orchestrator-app-connect-uat] Herder stopping   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
        | 2020-02-04 13:50:23,675 INFO   \|\|  [Worker clientId=connect-1, groupId=svc-tempo-orchestrator-app-connect-uat] Stopping connectors and tasks that are still assigned to this worker.   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
        | 2020-02-04 13:50:23,680 INFO   \|\|  Stopping connector tempo-orchestrator-cdc-uat   [org.apache.kafka.connect.runtime.Worker]
        | 2020-02-04 13:50:23,680 INFO   \|\|  Stopping task tempo-orchestrator-cdc-uat-0   [org.apache.kafka.connect.runtime.Worker]
        | 2020-02-04 13:50:23,681 INFO   \|\|  Stopped connector tempo-orchestrator-cdc-uat   [org.apache.kafka.connect.runtime.Worker]
        | 2020-02-04 13:50:23,683 INFO   \|\|  [Worker clientId=connect-1, groupId=svc-tempo-orchestrator-app-connect-uat] Member connect-1-970e16dd-eb30-43b0-8f6a-adca6e878a30 sending LeaveGroup request to coordinator b5-pkc-l5mw2.europe-west1.gcp.confluent.cloud:9092 (id: 2147483642 rack: null) due to the consumer is being closed   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
        | 2020-02-04 13:50:23,684 WARN   \|\|  [Worker clientId=connect-1, groupId=svc-tempo-orchestrator-app-connect-uat] Close timed out with 1 pending requests to coordinator, terminating client connections   [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
        | 2020-02-04 13:50:23,687 INFO   \|\|  Stopping KafkaBasedLog for topic adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-STATUS   [org.apache.kafka.connect.util.KafkaBasedLog]
        | 2020-02-04 13:50:23,688 INFO   \|\|  [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.   [org.apache.kafka.clients.producer.KafkaProducer]
        | 2020-02-04 13:50:23,705 INFO   \|\|  Stopped KafkaBasedLog for topic adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-STATUS   [org.apache.kafka.connect.util.KafkaBasedLog]
        | 2020-02-04 13:50:23,705 INFO   \|\|  Closing KafkaConfigBackingStore   [org.apache.kafka.connect.storage.KafkaConfigBackingStore]
        | 2020-02-04 13:50:23,705 INFO   \|\|  Stopping KafkaBasedLog for topic adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-CONFIGS   [org.apache.kafka.connect.util.KafkaBasedLog]
        | 2020-02-04 13:50:23,705 INFO   \|\|  [Producer clientId=producer-3] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.   [org.apache.kafka.clients.producer.KafkaProducer]
        | 2020-02-04 13:50:23,710 INFO   \|\|  Stopped KafkaBasedLog for topic adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-CONFIGS   [org.apache.kafka.connect.util.KafkaBasedLog]
        | 2020-02-04 13:50:23,710 INFO   \|\|  Closed KafkaConfigBackingStore   [org.apache.kafka.connect.storage.KafkaConfigBackingStore]
        | 2020-02-04 13:50:23,710 INFO   \|\|  Worker stopping   [org.apache.kafka.connect.runtime.Worker]
        | 2020-02-04 13:50:23,710 INFO   \|\|  Stopping KafkaOffsetBackingStore   [org.apache.kafka.connect.storage.KafkaOffsetBackingStore]
        | 2020-02-04 13:50:23,710 INFO   \|\|  Stopping KafkaBasedLog for topic adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-OFFSETS   [org.apache.kafka.connect.util.KafkaBasedLog]
        | 2020-02-04 13:50:23,711 INFO   \|\|  [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.   [org.apache.kafka.clients.producer.KafkaProducer]
        | 2020-02-04 13:50:23,717 INFO   \|\|  Stopped KafkaBasedLog for topic adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-OFFSETS   [org.apache.kafka.connect.util.KafkaBasedLog]
        | 2020-02-04 13:50:23,717 INFO   \|\|  Stopped KafkaOffsetBackingStore   [org.apache.kafka.connect.storage.KafkaOffsetBackingStore]
        | 2020-02-04 13:50:23,718 INFO   \|\|  Worker stopped   [org.apache.kafka.connect.runtime.Worker]
        | 2020-02-04 13:50:23,718 INFO   \|\|  [Worker clientId=connect-1, groupId=svc-tempo-orchestrator-app-connect-uat] Herder stopped   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
        | 2020-02-04 13:50:23,719 INFO   \|\|  [Worker clientId=connect-1, groupId=svc-tempo-orchestrator-app-connect-uat] Herder stopped   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
        | 2020-02-04 13:50:23,719 INFO   \|\|  Kafka Connect stopped   [org.apache.kafka.connect.runtime.Connect]
      

            Unassigned Unassigned
            deblockthomas62 Thomas Deblock (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: