Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6481

Postgres - Incremental snapshot fails on tables with an enum type in the primary key

XMLWordPrintable

    • Moderate

      Bug report

       

      What Debezium connector do you use and what version?

      Postgres connector 2.2

      What is the connector configuration?

       

      connector.class=io.debezium.connector.postgresql.PostgresConnector
      transforms.unwrap.delete.handling.mode=rewrite
      topic.creation.default.partitions=3
      incremental.snapshot.chunk.size=60000
      tasks.max=1
      publication.name=debezium
      transforms=unwrap
      schema.include.list=public
      database.sslmode=require
      topic.prefix=suggestion
      truncate.handling.mode=include
      signal.data.collection=public.debezium_signal
      transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
      publication.autocreate.mode=disabled

       

      What is the captured database version and mode of depoyment?

      RDS Postgres 13 

      What behaviour do you expect?

      The snapshot to complete

      What behaviour do you see?

      The snapshot errors repeatedly with the following stacktrace:

       

      Do you see the same behaviour using the latest relesead Debezium version?

      I can reproduce it on main via the IncrementalSnapshotIT via this patch:

       

       

       

      Index: debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java
      IDEA additional info:
      Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
      <+>UTF-8
      ===================================================================
      diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java
      --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java    (revision a58f4b4d4f93d44edee39abf2ac73efd1fc035cf)
      +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/IncrementalSnapshotIT.java    (date 1684403611694)
      @@ -48,7 +48,9 @@
                   + "CREATE TABLE s1.a4 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer, PRIMARY KEY(pk1, pk2, pk3, pk4));"
                   + "CREATE TABLE s1.a42 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer);"
                   + "CREATE TABLE s1.anumeric (pk numeric, aa integer, PRIMARY KEY(pk));"
      -            + "CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048));";
      +            + "CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048));"
      +            + "CREATE TYPE enum_type AS ENUM ('UP', 'DOWN', 'LEFT', 'RIGHT', 'STORY');"
      +            + "CREATE TABLE s1.enumpk (pk enum_type, aa integer, PRIMARY KEY(pk));";
       
           @Before
           public void before() throws SQLException {
      @@ -167,6 +169,37 @@
               TestHelper.waitForDefaultReplicationSlotBeActive();
           }
       
      +    @Test
      +    public void insertsEnumPk() throws Exception {
      +        // Testing.Print.enable();
      +        final List<String> ENUM = List.of("UP", "DOWN", "LEFT", "RIGHT", "STORY");
      +
      +        try (JdbcConnection connection = databaseConnection()) {
      +            connection.setAutoCommit(false);
      +            for (int i = 0; i < ENUM.size(); i++) {
      +                connection.executeWithoutCommitting(String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)",
      +                    "s1.enumpk", connection.quotedColumnIdString(pkFieldName()),"'"+ENUM.get(i)+"'", i));
      +            }
      +            connection.commit();
      +        }
      +        startConnector();
      +
      +        sendAdHocSnapshotSignal("s1.enumpk");
      +
      +        final int expectedRecordCount = ROW_COUNT;
      +        final Map<Integer, Integer> dbChanges = consumeMixedWithIncrementalSnapshot(
      +            expectedRecordCount,
      +            x -> true,
      +            k -> VariableScaleDecimal.toLogical(k.getStruct("pk")).getWrappedValue().intValue(),
      +            record -> ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()),
      +            "test_server.s1.anumeric",
      +            null);
      +        for (int i = 0; i < expectedRecordCount; i++) {
      +            assertThat(dbChanges).contains(entry(i + 1, i));
      +        }
      +    }
      +
      +
           @Test
           public void inserts4Pks() throws Exception {
               // Testing.Print.enable();
      
       

      Do you have the connector logs, ideally from start till finish?

       

      io.debezium.DebeziumException: Snapshotting of table public.suggestion failed
      at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.createDataEventsForTable(AbstractIncrementalSnapshotChangeEventSource.java:635)
      at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readChunk(AbstractIncrementalSnapshotChangeEventSource.java:366)
      at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.init(AbstractIncrementalSnapshotChangeEventSource.java:301)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$initStreamEvents$1(ChangeEventSourceCoordinator.java:187)
      at java.base/java.util.Optional.ifPresent(Optional.java:183)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.initStreamEvents(ChangeEventSourceCoordinator.java:187)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:172)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.postgresql.util.PSQLException: ERROR: operator does not exist: suggestion_type > character varying
      Hint: No operator matches the given name and argument types. You might need to add explicit type casts.
      Position: 116
      at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676)
      at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366)
      at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356)
      at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496)
      at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:413)
      at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:190)
      at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:134)
      at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.createDataEventsForTable(AbstractIncrementalSnapshotChangeEventSource.java:591)
      ... 13 more
      org.apache.kafka.connect.errors.RetriableException: An exception occurred in the change event producer. This connector will be restarted.
      at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:68)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:116)
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.base/java.lang.Thread.run(Thread.java:829)

       

      How to reproduce the issue using our tutorial deployment?

      1. Create a user-defined type
      2. Create a table that uses that user-defined type as part of its PK
      3. Trigger an incremental snapshot for that table

      Example for steps 1 and 2:

      CREATE TYPE enum_type AS ENUM ('UP', 'DOWN', 'LEFT', 'RIGHT', 'STORY');
      CREATE TABLE s1.enumpk (pk enum_type, aa integer, PRIMARY KEY(pk));

       

              Unassigned Unassigned
              dreft.laurent@gmail.com Frederic Laurent (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: