-
Bug
-
Resolution: Unresolved
-
Major
-
None
-
None
-
None
-
None
-
False
-
None
-
False
What Debezium connector do you use and what version?
1.9.0.FINAL
What is the connector configuration?
{ "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.sslmode": "require", "errors.log.enable": "true", "errors.log.include.messages": "true", "heartbeat.interval.ms": "60000", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": "false", "max.batch.size": "200", "max.poll.records": "200", "max.queue.size": "20240", "plugin.name": "pgoutput", "poll.interval.ms": "10", "sanitize.field.names": "false", "schema.include.list": "public", "slot.drop.on.stop": "false", "slot.name": "debezium_cdc_pgoutput", "snapshot.mode": "initial", "tasks.max": "1", "tombstones.on.delete": "false", "transforms": "outbox", "transforms.outbox.route.by.field": "topic", "transforms.outbox.route.tombstone.on.empty.payload": "false", "transforms.outbox.route.topic.regex": "(?<routedByValue>.*)", "transforms.outbox.route.topic.replacement": "${routedByValue}", "transforms.outbox.table.field.event.id": "id", "transforms.outbox.table.field.event.key": "customer_order_id", "transforms.outbox.table.field.event.payload": "event", "transforms.outbox.table.field.event.payload.id": "customer_order_id", "transforms.outbox.table.field.event.type": "event_type", "transforms.outbox.table.fields.additional.placement": "event_type:header:eventType,correlation_id:header:b3,bu_code:header:bu-code,app_source:header:app-source,x_datadog_trace_id:header:x-datadog-trace-id,x_datadog_parent_id:header:x-datadog-parent-id,x_datadog_sampling_priority:header:x-datadog-sampling-priority", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "value.converter": "io.debezium.converters.ByteBufferConverter", "value.converter.delegate.converter.type": "org.apache.kafka.connect.json.JsonConverter", "value.converter.delegate.converter.type.schemas.enable": "false", "value.converter.schemas.enable": "false", "auto.create.topics.enable": "false", "topic.creation.enable": "false", "topic.creation.default.replication.factor": 1, "topic.creation.default.partitions": 10, "topic.creation.default.cleanup.policy": "compact", "topic.creation.default.compression.type": "lz4" }
What is the captured database version and mode of depoyment?
postgres 12.10
What behaviour do you expect?
Don't stop the connector if a topic is not found, skeep the event and continue to send next events.
What behaviour do you see?
The error is logged, and the consumer is stopped when the topic is not found and can not be created.
If I restart the consumer, the error occurs again and the connector is stopped.
WorkerSourceTask{id=tempo-orchestrator-cdc-uat1-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted WorkerSourceTask{id=tempo-orchestrator-cdc-uat1-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted org.apache.kafka.connect.errors.ConnectException: Not authorized to describe topic(s) 'test' on the brokers ----- at org.apache.kafka.connect.util.TopicAdmin.lambda$describeTopics$0(TopicAdmin.java:471) at java.base/java.util.HashMap.forEach(HashMap.java:1336) at org.apache.kafka.connect.util.TopicAdmin.describeTopics(TopicAdmin.java:459) at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeCreateTopic(WorkerSourceTask.java:403) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:348) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Topic authorization failed.
Do you see the same behaviour using the latest relesead Debezium version?
Yes
How to reproduce the issue using our tutorial deployment?
Hard to reproduce with a local env, I can't configure a kafka a disable topic creation.
To reproduce, we should try to send an event on a non existing topic, and kafka should disable topic creation.