-
Bug
-
Resolution: Done
-
Major
-
2.2.0.Final
-
None
Bug report
What Debezium connector do you use and what version?
Postgres connector 2.2
What is the connector configuration?
connector.class=io.debezium.connector.postgresql.PostgresConnector transforms.unwrap.delete.handling.mode=rewrite topic.creation.default.partitions=3 incremental.snapshot.chunk.size=60000 tasks.max=1 publication.name=debezium transforms=unwrap schema.include.list=public database.sslmode=require topic.prefix=suggestion truncate.handling.mode=include signal.data.collection=public.debezium_signal transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState publication.autocreate.mode=disabled
What is the captured database version and mode of depoyment?
RDS Postgres 13
What behaviour do you expect?
The snapshot to complete
What behaviour do you see?
The snapshot errors repeatedly with the following stacktrace:
Do you see the same behaviour using the latest relesead Debezium version?
I can reproduce it on main via the IncrementalSnapshotIT via this patch:
Index: debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java IDEA additional info: Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP <+>UTF-8 =================================================================== diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java (revision a58f4b4d4f93d44edee39abf2ac73efd1fc035cf) +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java (date 1684403611694) @@ -48,7 +48,9 @@ + "CREATE TABLE s1.a4 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer, PRIMARY KEY(pk1, pk2, pk3, pk4));" + "CREATE TABLE s1.a42 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer);" + "CREATE TABLE s1.anumeric (pk numeric, aa integer, PRIMARY KEY(pk));" - + "CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048));"; + + "CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048));" + + "CREATE TYPE enum_type AS ENUM ('UP', 'DOWN', 'LEFT', 'RIGHT', 'STORY');" + + "CREATE TABLE s1.enumpk (pk enum_type, aa integer, PRIMARY KEY(pk));"; @Before public void before() throws SQLException { @@ -167,6 +169,37 @@ TestHelper.waitForDefaultReplicationSlotBeActive(); } + @Test + public void insertsEnumPk() throws Exception { + // Testing.Print.enable(); + final List<String> ENUM = List.of("UP", "DOWN", "LEFT", "RIGHT", "STORY"); + + try (JdbcConnection connection = databaseConnection()) { + connection.setAutoCommit(false); + for (int i = 0; i < ENUM.size(); i++) { + connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", + "s1.enumpk", connection.quotedColumnIdString(pkFieldName()),"'"+ENUM.get(i)+"'", i)); + } + connection.commit(); + } + startConnector(); + + sendAdHocSnapshotSignal("s1.enumpk"); + + final int expectedRecordCount = ROW_COUNT; + final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot( + expectedRecordCount, + x -> true, + k -> VariableScaleDecimal.toLogical(k.getStruct("pk")).getWrappedValue().intValue(), + record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()), + "test_server.s1.anumeric", + null); + for (int i = 0; i < expectedRecordCount; i++) { + assertThat(dbChanges).contains(entry(i + 1, i)); + } + } + + @Test public void inserts4Pks() throws Exception { // Testing.Print.enable();
Do you have the connector logs, ideally from start till finish?
io.debezium.DebeziumException: Snapshotting of table public.suggestion failed at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.createDataEventsForTable(AbstractIncrementalSnapshotChangeEventSource.java:635) at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readChunk(AbstractIncrementalSnapshotChangeEventSource.java:366) at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.init(AbstractIncrementalSnapshotChangeEventSource.java:301) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$initStreamEvents$1(ChangeEventSourceCoordinator.java:187) at java.base/java.util.Optional.ifPresent(Optional.java:183) at io.debezium.pipeline.ChangeEventSourceCoordinator.initStreamEvents(ChangeEventSourceCoordinator.java:187) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172) at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.postgresql.util.PSQLException: ERROR: operator does not exist: suggestion_type > character varying Hint: No operator matches the given name and argument types. You might need to add explicit type casts. Position: 116 at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356) at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496) at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:413) at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:190) at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:134) at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.createDataEventsForTable(AbstractIncrementalSnapshotChangeEventSource.java:591) ... 13 more org.apache.kafka.connect.errors.RetriableException: An exception occurred in the change event producer. This connector will be restarted. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:68) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:116) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)
How to reproduce the issue using our tutorial deployment?
1. Create a user-defined type
2. Create a table that uses that user-defined type as part of its PK
3. Trigger an incremental snapshot for that table
Example for steps 1 and 2:
CREATE TYPE enum_type AS ENUM ('UP', 'DOWN', 'LEFT', 'RIGHT', 'STORY'); CREATE TABLE s1.enumpk (pk enum_type, aa integer, PRIMARY KEY(pk));
- links to
-
RHEA-2024:129636 Red Hat build of Debezium 2.5.4 release