-
Bug
-
Resolution: Won't Do
-
Major
-
None
-
1.0.0.Final
-
None
We have an issue when the debezium connector try to push a message on a topic.
If the topic doesn't exists, or if debezium can not access to the topic, it crash and new change will not be send.
We have the issue using `outboxEventRouter` and dynamic topics name.
The connector configuration is :
{ "name": "connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "mysecretpassword", "database.dbname" : "postgres", "database.server.name": "db", "table.whitelist": "event", "plugin.name": "wal2json", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "io.debezium.converters.ByteBufferConverter", "key.converter.schemas.enable":"false", "value.converter.schemas.enable": "false", "poll.interval.ms": 300, "transforms" : "outbox", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.table.field.event.payload": "event", "transforms.outbox.table.field.event.payload.id": "id", "transforms.outbox.table.field.event.type": "event_type", "transforms.outbox.table.field.event.key": "id", "transforms.outbox.route.by.field": "topic", "transforms.outbox.route.topic.regex": "(?<routedByValue>.*)", "transforms.outbox.route.topic.replacement": "${routedByValue}", "transforms.outbox.table.fields.additional.placement": "event_type:header:eventType,correlation_id:header:b3" } }
stack trace sample:
| 2020-02-04 13:49:28,277 ERROR \|\| [Producer clientId=connector-producer-tempo-orchestrator-cdc-uat-0] Topic authorization failed for topics [technical-event] [org.apache.kafka.clients.Metadata] | 2020-02-04 13:49:28,278 ERROR \|\| WorkerSourceTask{id=tempo-orchestrator-cdc-uat-0} failed to send record to technical-event: [org.apache.kafka.connect.runtime.WorkerSourceTask] | org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [technical-event] | 2020-02-04 13:49:28,283 INFO \|\| WorkerSourceTask{id=tempo-orchestrator-cdc-uat-0} Committing offsets [org.apache.kafka.connect.runtime.WorkerSourceTask] | 2020-02-04 13:49:28,283 INFO \|\| WorkerSourceTask{id=tempo-orchestrator-cdc-uat-0} flushing 1 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask] | 2020-02-04 13:49:33,284 ERROR \|\| WorkerSourceTask{id=tempo-orchestrator-cdc-uat-0} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages [org.apache.kafka.connect.runtime.WorkerSourceTask] | 2020-02-04 13:49:33,284 ERROR \|\| WorkerSourceTask{id=tempo-orchestrator-cdc-uat-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask] | org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback | at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:258) | at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:312) | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240) | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) | at java.base/java.lang.Thread.run(Thread.java:834) | Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [technical-event] | 2020-02-04 13:49:33,285 ERROR \|\| WorkerSourceTask{id=tempo-orchestrator-cdc-uat-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask] | 2020-02-04 13:49:33,286 INFO Postgres\|adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-CONNECT\|postgres-connector-task Finished streaming [io.debezium.pipeline.ChangeEventSourceCoordinator] | 2020-02-04 13:49:33,286 INFO Postgres\|adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-CONNECT\|postgres-connector-task Connected metrics set to 'false' [io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics] | 2020-02-04 13:49:33,289 INFO \|\| [Producer clientId=connector-producer-tempo-orchestrator-cdc-uat-0] Closing the Kafka producer with timeoutMillis = 30000 ms. [org.apache.kafka.clients.producer.KafkaProducer] | 2020-02-04 13:49:33,289 ERROR \|\| [Producer clientId=connector-producer-tempo-orchestrator-cdc-uat-0] Interrupted while joining ioThread [org.apache.kafka.clients.producer.KafkaProducer] | java.lang.InterruptedException | at java.base/java.lang.Object.wait(Native Method) | at java.base/java.lang.Thread.join(Thread.java:1313) | at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1199) | at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1176) | at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:158) | at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:156) | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:183) | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) | at java.base/java.lang.Thread.run(Thread.java:834) | 2020-02-04 13:49:33,289 INFO \|\| [Producer clientId=connector-producer-tempo-orchestrator-cdc-uat-0] Proceeding to force close the producer since pending requests could not be completed within timeout 30000 ms. [org.apache.kafka.clients.producer.KafkaProducer] | 2020-02-04 13:49:33,290 WARN \|\| Could not close producer [org.apache.kafka.connect.runtime.WorkerSourceTask] | org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException | at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1201) | at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1176) | at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:158) | at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:156) | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:183) | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) | at java.base/java.lang.Thread.run(Thread.java:834) | Caused by: java.lang.InterruptedException | at java.base/java.lang.Object.wait(Native Method) | at java.base/java.lang.Thread.join(Thread.java:1313) | at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1199) | ... 10 more | 2020-02-04 13:50:23,666 INFO \|\| Kafka Connect stopping [org.apache.kafka.connect.runtime.Connect] | 2020-02-04 13:50:23,666 INFO \|\| Stopping REST server [org.apache.kafka.connect.runtime.rest.RestServer] | 2020-02-04 13:50:23,672 INFO \|\| Stopped http_10.131.19.368083@c808207{HTTP/1.1,[http/1.1]}{10.131.19.36:8083} [org.eclipse.jetty.server.AbstractConnector] | 2020-02-04 13:50:23,672 INFO \|\| node0 Stopped scavenging [org.eclipse.jetty.server.session] | 2020-02-04 13:50:23,675 INFO \|\| REST server stopped [org.apache.kafka.connect.runtime.rest.RestServer] | 2020-02-04 13:50:23,675 INFO \|\| [Worker clientId=connect-1, groupId=svc-tempo-orchestrator-app-connect-uat] Herder stopping [org.apache.kafka.connect.runtime.distributed.DistributedHerder] | 2020-02-04 13:50:23,675 INFO \|\| [Worker clientId=connect-1, groupId=svc-tempo-orchestrator-app-connect-uat] Stopping connectors and tasks that are still assigned to this worker. [org.apache.kafka.connect.runtime.distributed.DistributedHerder] | 2020-02-04 13:50:23,680 INFO \|\| Stopping connector tempo-orchestrator-cdc-uat [org.apache.kafka.connect.runtime.Worker] | 2020-02-04 13:50:23,680 INFO \|\| Stopping task tempo-orchestrator-cdc-uat-0 [org.apache.kafka.connect.runtime.Worker] | 2020-02-04 13:50:23,681 INFO \|\| Stopped connector tempo-orchestrator-cdc-uat [org.apache.kafka.connect.runtime.Worker] | 2020-02-04 13:50:23,683 INFO \|\| [Worker clientId=connect-1, groupId=svc-tempo-orchestrator-app-connect-uat] Member connect-1-970e16dd-eb30-43b0-8f6a-adca6e878a30 sending LeaveGroup request to coordinator b5-pkc-l5mw2.europe-west1.gcp.confluent.cloud:9092 (id: 2147483642 rack: null) due to the consumer is being closed [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] | 2020-02-04 13:50:23,684 WARN \|\| [Worker clientId=connect-1, groupId=svc-tempo-orchestrator-app-connect-uat] Close timed out with 1 pending requests to coordinator, terminating client connections [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] | 2020-02-04 13:50:23,687 INFO \|\| Stopping KafkaBasedLog for topic adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-STATUS [org.apache.kafka.connect.util.KafkaBasedLog] | 2020-02-04 13:50:23,688 INFO \|\| [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [org.apache.kafka.clients.producer.KafkaProducer] | 2020-02-04 13:50:23,705 INFO \|\| Stopped KafkaBasedLog for topic adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-STATUS [org.apache.kafka.connect.util.KafkaBasedLog] | 2020-02-04 13:50:23,705 INFO \|\| Closing KafkaConfigBackingStore [org.apache.kafka.connect.storage.KafkaConfigBackingStore] | 2020-02-04 13:50:23,705 INFO \|\| Stopping KafkaBasedLog for topic adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-CONFIGS [org.apache.kafka.connect.util.KafkaBasedLog] | 2020-02-04 13:50:23,705 INFO \|\| [Producer clientId=producer-3] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [org.apache.kafka.clients.producer.KafkaProducer] | 2020-02-04 13:50:23,710 INFO \|\| Stopped KafkaBasedLog for topic adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-CONFIGS [org.apache.kafka.connect.util.KafkaBasedLog] | 2020-02-04 13:50:23,710 INFO \|\| Closed KafkaConfigBackingStore [org.apache.kafka.connect.storage.KafkaConfigBackingStore] | 2020-02-04 13:50:23,710 INFO \|\| Worker stopping [org.apache.kafka.connect.runtime.Worker] | 2020-02-04 13:50:23,710 INFO \|\| Stopping KafkaOffsetBackingStore [org.apache.kafka.connect.storage.KafkaOffsetBackingStore] | 2020-02-04 13:50:23,710 INFO \|\| Stopping KafkaBasedLog for topic adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-OFFSETS [org.apache.kafka.connect.util.KafkaBasedLog] | 2020-02-04 13:50:23,711 INFO \|\| [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [org.apache.kafka.clients.producer.KafkaProducer] | 2020-02-04 13:50:23,717 INFO \|\| Stopped KafkaBasedLog for topic adeo-staging-europe-west1-CONNECT-TEMPO-GRP-GRP-P3-C3-TOR-OFFSETS [org.apache.kafka.connect.util.KafkaBasedLog] | 2020-02-04 13:50:23,717 INFO \|\| Stopped KafkaOffsetBackingStore [org.apache.kafka.connect.storage.KafkaOffsetBackingStore] | 2020-02-04 13:50:23,718 INFO \|\| Worker stopped [org.apache.kafka.connect.runtime.Worker] | 2020-02-04 13:50:23,718 INFO \|\| [Worker clientId=connect-1, groupId=svc-tempo-orchestrator-app-connect-uat] Herder stopped [org.apache.kafka.connect.runtime.distributed.DistributedHerder] | 2020-02-04 13:50:23,719 INFO \|\| [Worker clientId=connect-1, groupId=svc-tempo-orchestrator-app-connect-uat] Herder stopped [org.apache.kafka.connect.runtime.distributed.DistributedHerder] | 2020-02-04 13:50:23,719 INFO \|\| Kafka Connect stopped [org.apache.kafka.connect.runtime.Connect]