-
Bug
-
Resolution: Done
-
Major
-
3.2.0.Final
-
None
-
False
-
-
False
In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
jdbc 3.1.1
What is the connector configuration?
"bootstrap.servers": "{{brokerList}}", "group.id": "demos", "key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter", "value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter", "key.converter.registry.name": "{{glueRegistryKeysName}}", "value.converter.registry.name": "{{glueRegistryValuesName}}", "key.converter.region": "{{awsRegion}}", "value.converter.region": "{{awsRegion}}", "key.converter.compatibility": "FORWARD", "value.converter.compatibility": "FORWARD", "key.converter.schemaAutoRegistrationEnabled": "true", "value.converter.schemaAutoRegistrationEnabled": "true", "value.converter.schema.registry.url": "http://schema-registry:8081", "key.converter.avroRecordType": "GENERIC_RECORD", "value.converter.avroRecordType": "GENERIC_RECORD", "errors.tolerance": "all", "offset.storage.file.filename": "/tmp/connect.offsets", "offset.flush.interval.ms": "10000", "security.protocol": "SASL_SSL", "sasl.mechanism": "AWS_MSK_IAM", "sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "producer.security.protocol": "SASL_SSL", "producer.sasl.mechanism": "AWS_MSK_IAM", "producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "consumer.security.protocol": "SASL_SSL", "consumer.sasl.mechanism": "AWS_MSK_IAM", "consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "column.propagate.source.type": ".*", "connect.protocol": "eager", "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", "connection.loginTimeout": "10", "insert.mode": "upsert", "delete.enabled": "true", "database.time_zone": "UTC", "primary.key.mode": "record_key", "schema.evolution": "basic", "table.name.format": "${source.schema}.${source.table}", "heartbeat.interval.ms": "10000", "poll.interval.ms": "300000", "tasks.max": "10",
What is the captured database version and mode of deployment?
(E.g. on-premises, with a specific cloud provider, etc.)
RDS Postgres 16.2
What behavior do you expect?
Connector task to not crash when encounter an collision with another connector task trying to create a table.
What behavior do you see?
I'm using the jdbc sink connector for postgres with multiple tasks and consistently get what I believe are collisions during the CREATE TABLE step. I believe multiple tasks are trying to create the table at the same time and receive this error. This does not occur when I specify the connector config to run using a single task.
Do you see the same behaviour using the latest released Debezium version?
(Ideally, also verify with latest Alpha/Beta/CR version)
Yes
Do you have the connector logs, ideally from start till finish?
(You might be asked later to provide DEBUG/TRACE level log)
Table Errors: 0 Last Task Failure Message: Connector ctest694-6b196ca6-a074-4da8-a2bb-1e621f55d9a9-stage-20250708-145634884-1 has one or more tasks in a failed state. {\"name\":\"ctest694-6b196ca6-a074-4da8-a2bb-1e621f55d9a9-stage-20250708-145634884-1\",\"connector\":{\"state\":\"RUNNING\",\"worker_id\":\"ip-10-234-149-183.ec2.internal:32770\"},\"tasks\":[{\"id\":0,\"state\":\"FAILED\",\"worker_id\":\"ip-10-234-149-183.ec2.internal:32770\",\"trace\":\"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\\n\\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:636)\\n\\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345)\\n\\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)\\n\\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)\\n\\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226)\\n\\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)\\n\\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)\\n\\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\\n\\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\\n\\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\\n\\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\\n\\tat java.base/java.lang.Thread.run(Thread.java:840)\\nCaused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure\\n\\tat io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:121)\\n\\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:606)\\n\\t... 11 more\\nCaused by: org.hibernate.exception.ConstraintViolationException: JDBC exception executing SQL [CREATE TABLE XXXXX.YYYYY duplicate key value violates unique constraint \\\"pg_type_typname_nsp_index\\\"\\n Detail: Key (typname, typnamespace)=(sabsupl, 400275) already exists. in 10324 account
How to reproduce the issue using our tutorial deployment?
Run a jdbc sink connector with more than 1 task (10 in my case) and watch tasks crash as they concurrently try to create the same table from a topic. One succeeds; the others crash.
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
Request for tasks to log error but not crash when this occurs.
Which use case/requirement will be addressed by the proposed feature?
Running multiple tasks on a jdbc sink connector against postgres were tables are automatically created.
Implementation ideas (optional)
<Your answer>