-
Bug
-
Resolution: Done
-
Major
-
2.0.1.Final
-
None
-
False
-
None
-
False
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
debezium-connector-postgres version 2.0.1.Final
What is the connector configuration?
{ "name": "postgres-source-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "postgres", "topic.prefix": "test_postgres", "table.include.list": "public.test_part_table", "snapshot.mode": "never", "plugin.name": "pgoutput", "slot.name": "test_dbz_slot_123", "publication.name": "test_dbz_pub_123", "publication.autocreate.mode": "filtered" } }
What is the captured database version and mode of deployment?
PostgreSQL v.15.1, Kubernetes
What behaviour do you expect?
I have pre-created partitioned table and publication with 'publish_via_partition_root' parameter for another table. I am able to create a connector using them for configuration and the connector successfully starts up and works.
What behaviour do you see?
The connector startup fails with the error:
org.apache.kafka.connect.errors.ConnectException: Unable to create filtered publication test_dbz_pub_123 for
Do you see the same behaviour using the latest released Debezium version?
Yes
Do you have the connector logs, ideally from start till finish?
2022-12-19 11:51:21,167 ERROR || WorkerSourceTask{id=postgres-source-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask] org.apache.kafka.connect.errors.ConnectException: Unable to create filtered publication test_dbz_pub_123 for at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createOrUpdatePublicationModeFilterted(PostgresReplicationConnection.java:212) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection.java:170) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:383) at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:137) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:136) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:270) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:187) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: io.debezium.DebeziumException: No table filters found for filtered publication test_dbz_pub_123 at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createOrUpdatePublicationModeFilterted(PostgresReplicationConnection.java:204) ... 13 more
How to reproduce the issue using our tutorial deployment?
- Create tables:
CREATE TABLE test_table ( id BIGSERIAL NOT NULL, value INTEGER, PRIMARY KEY (id, value) ); CREATE TABLE test_part_table ( id BIGSERIAL NOT NULL, value INTEGER, PRIMARY KEY (id, value) ) PARTITION BY RANGE (value); CREATE TABLE test_part_table1 PARTITION OF test_part_table FOR VALUES FROM (0) TO (500); CREATE TABLE test_part_table2 PARTITION OF test_part_table FOR VALUES FROM (500) TO (1000);
- Create a new publication with 'publish_via_partition_root' parameter:
CREATE PUBLICATION test_dbz_pub_123 FOR TABLE public.test_table WITH (publish_via_partition_root = true);
- Create a new connector:
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d' { "name": "postgres-source-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "postgres", "topic.prefix": "test_postgres", "table.include.list": "public.test_part_table", "snapshot.mode": "never", "plugin.name": "pgoutput", "slot.name": "test_dbz_slot_123", "publication.name": "test_dbz_pub_123", "publication.autocreate.mode": "filtered" } }'
- Check status for created connector:
curl -X GET http://localhost:8083/connectors/postgres-source-connector/status
ER:
{ "name" : "postgres-source-connector", "connector" : { "state" : "RUNNING", "worker_id" : "172.19.0.5:8083" }, "tasks" : [ { "id" : 0, "state" : "RUNNING", "worker_id" : "172.19.0.5:8083" } ], "type" : "source" }
AR:
{ "name": "postgres-source-connector", "connector": { "state": "RUNNING", "worker_id": "172.19.0.5:8083" }, "tasks": [ { "id": 0, "state": "FAILED", "worker_id": "172.19.0.5:8083", "trace": "org.apache.kafka.connect.errors.ConnectException: Unable to create filtered publication test_dbz_pub_123 for \n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createOrUpdatePublicationModeFilterted(PostgresReplicationConnection.java:212)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection.java:170)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:383)\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:137)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:136)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:270)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:187)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: io.debezium.DebeziumException: No table filters found for filtered publication test_dbz_pub_123\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createOrUpdatePublicationModeFilterted(PostgresReplicationConnection.java:204)\n\t... 13 more\n" } ], "type": "source" }
Implementation ideas (optional)
I suggest supporting partitioned tables during updating a publication.