Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7129

Incremental snapshotting blocks when transaction failure occurs

XMLWordPrintable

    • False
    • Hide

      None

      Show
      None
    • False
    • Important

      Bug report

      What Debezium connector do you use and what version?

      Debezium PostgreSQL connector version 2.4.0.Final.

      What is the connector configuration?

      The issue has been simulated using a local set-up, so as not to cause downtime on existing environments.

      The local connector set-up is:

       

      {
          "name": "debezium-connector",
          "config": {
              "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
              "database.hostname": "postgres", 
              "database.port": "5432", 
              "database.user": "postgres", 
              "database.password": "postgres", 
              "database.dbname" : "postgres", 
              "topic.prefix": "prefix",
              "plugin.name": "pgoutput",
              "snapshot.mode": "never",
              "slot.name": "debezium",
              "signal.kafka.topic": "debezium-signal-request",
              "topic.transaction": "transactions",
              "publication.name": "debezium_publication",
              "signal.enabled.channels": "source",
              "signal.data.collection": "inetrnal_schema.debezium_signal_table",
              "signal.poll.interval.ms": "2000",
              "signal.kafka.poll.timeout.ms": "2000",
              "notification.enabled.channels": "log",
              "producer.override.compression.type": "lz4",
              "producer.override.linger.ms": "50",
              "producer.override.batch.size": "16384",
              "producer.override.max.request.size": "2097152"
          }
      } 

       

      What is the captured database version and mode of depoyment?

      The issue is not related to a specific database.

      What behaviour do you expect?

      When making a snapshot request for multiple data collections, in the case of an erroneous filter statement on one or more of the collections (when additional conditions are specified), the data collections with the erroneous filters should be skipped. Only the correct filter conditions should be processed.

      What behaviour do you see?

      In case of an erroneous filter on one or more of the collections, incremental snapshotting blocks. Other incremental snapshot requests following the one with erroneous filters are never processed.

      This behaviour is most easily reproduced on a freshly created connector.

      The observed behaviour is as follows:

      org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block 

      This issue is caused by the fact that an attempt is made to reuse a SQL connection where a transaction has failed - the DB refuses to execute other commands on the same connection.

      From my own understanding, possible the following could be considered:

      • retrigger/rollback transaction in case of failure
      • terminate snapshotting in case of a non-recoverable SQL error so as not to block subsequent snapshot requests

      Do you see the same behaviour using the latest relesead Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      Yes - the issu is reproducible with version 2.5.

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      • The first failure when the first data collection with the misspelt column in the filter is processed:
      Failed to read maximum key for table **********   [io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource]
      2023-11-10T13:31:54.961279913Z org.postgresql.util.PSQLException: ERROR: column "**********" does not exist
      2023-11-10T13:31:54.961336513Z   Position: 101
      2023-11-10T13:31:54.961392513Z     at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
      2023-11-10T13:31:54.961447513Z     at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
      2023-11-10T13:31:54.962645118Z     at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:368)
      2023-11-10T13:31:54.962705118Z     at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:498)
      2023-11-10T13:31:54.962933919Z     at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:415)
      2023-11-10T13:31:54.962991219Z     at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:335)
      2023-11-10T13:31:54.963046719Z     at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:321)
      2023-11-10T13:31:54.963151420Z     at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:297)
      2023-11-10T13:31:54.963207120Z     at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:246)
      2023-11-10T13:31:54.963261720Z     at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:626)
      2023-11-10T13:31:54.963316720Z     at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:495)
      2023-11-10T13:31:54.963417921Z     at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readChunk(AbstractIncrementalSnapshotChangeEventSource.java:355)
      2023-11-10T13:31:54.963474721Z     at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.addDataCollectionNamesToSnapshot(AbstractIncrementalSnapshotChangeEventSource.java:521)
      2023-11-10T13:31:54.963530021Z     at io.debezium.pipeline.signal.actions.snapshotting.ExecuteSnapshot.arrived(ExecuteSnapshot.java:78)
      2023-11-10T13:31:54.963593022Z     at io.debezium.pipeline.signal.SignalProcessor.processSignal(SignalProcessor.java:191)
      2023-11-10T13:31:54.963708022Z     at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
      2023-11-10T13:31:54.963764422Z     at java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720)
      2023-11-10T13:31:54.963820622Z     at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
      2023-11-10T13:31:54.963881623Z     at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
      2023-11-10T13:31:54.964033223Z     at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      2023-11-10T13:31:54.964089023Z     at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
      2023-11-10T13:31:54.964144124Z     at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
      2023-11-10T13:31:54.964199124Z     at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
      2023-11-10T13:31:54.964324424Z     at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
      2023-11-10T13:31:54.964392625Z     at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
      2023-11-10T13:31:54.964458925Z     at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
      2023-11-10T13:31:54.964527625Z     at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
      2023-11-10T13:31:54.964649626Z     at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
      2023-11-10T13:31:54.964857926Z     at io.debezium.pipeline.signal.SignalProcessor.lambda$processSourceSignal$4(SignalProcessor.java:155)
      2023-11-10T13:31:54.964982127Z     at io.debezium.pipeline.signal.SignalProcessor.executeWithSemaphore(SignalProcessor.java:165)
      2023-11-10T13:31:54.965054327Z     at io.debezium.pipeline.signal.SignalProcessor.processSourceSignal(SignalProcessor.java:149)
      2023-11-10T13:31:54.965122127Z     at io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:283)
      2023-11-10T13:31:54.965473128Z     at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:79)
      2023-11-10T13:31:54.965544129Z     at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:47)
      2023-11-10T13:31:54.974012462Z     at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:94)
      2023-11-10T13:31:54.974087262Z     at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:264)
      2023-11-10T13:31:54.974165062Z     at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processReplicationMessages(PostgresStreamingChangeEventSource.java:319)
      2023-11-10T13:31:54.974233362Z     at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.lambda$processMessages$0(PostgresStreamingChangeEventSource.java:221)
      2023-11-10T13:31:54.974299063Z     at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.decodeInsert(PgOutputMessageDecoder.java:432)
      2023-11-10T13:31:54.974369863Z     at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:208)
      2023-11-10T13:31:54.974435863Z     at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:41)
      2023-11-10T13:31:54.974503263Z     at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:637)
      2023-11-10T13:31:54.974587364Z     at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:629)
      2023-11-10T13:31:54.974655764Z     at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:221)
      2023-11-10T13:31:54.974724264Z     at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:183)
      2023-11-10T13:31:54.974792065Z     at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:42)
      2023-11-10T13:31:54.974858565Z     at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:272)
      2023-11-10T13:31:54.974925165Z     at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:197)
      2023-11-10T13:31:54.974992865Z     at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:137)
      2023-11-10T13:31:54.975060165Z     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      2023-11-10T13:31:54.975128966Z     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      2023-11-10T13:31:54.975525367Z     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      2023-11-10T13:31:54.975594968Z     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      2023-11-10T13:31:54.975667868Z     at java.base/java.lang.Thread.run(Thread.java:829)
      • Then when the 2nd data collection is being processed, the attempt at executing another command on the failed transaction connection fails with:
      io.debezium.DebeziumException: Database error while executing incremental snapshot ........
      2023-11-10T13:31:54.991390829Z     at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readChunk(AbstractIncrementalSnapshotChangeEventSource.java:411)
      2023-11-10T13:31:54.991463429Z     at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.addDataCollectionNamesToSnapshot(AbstractIncrementalSnapshotChangeEventSource.java:521)
      2023-11-10T13:31:54.991528429Z     at io.debezium.pipeline.signal.actions.snapshotting.ExecuteSnapshot.arrived(ExecuteSnapshot.java:78)
      2023-11-10T13:31:54.991600330Z     at io.debezium.pipeline.signal.SignalProcessor.processSignal(SignalProcessor.java:191)
      2023-11-10T13:31:54.991663330Z     at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
      2023-11-10T13:31:54.991729330Z     at java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720)
      2023-11-10T13:31:54.991799631Z     at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
      2023-11-10T13:31:54.991865731Z     at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
      2023-11-10T13:31:54.991932131Z     at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
      2023-11-10T13:31:54.992006731Z     at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
      2023-11-10T13:31:54.992071532Z     at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
      2023-11-10T13:31:54.992143632Z     at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
      2023-11-10T13:31:54.992226332Z     at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
      2023-11-10T13:31:54.992290033Z     at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
      2023-11-10T13:31:54.992364333Z     at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
      2023-11-10T13:31:54.992429933Z     at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
      2023-11-10T13:31:54.992494933Z     at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
      2023-11-10T13:31:54.992557134Z     at io.debezium.pipeline.signal.SignalProcessor.lambda$processSourceSignal$4(SignalProcessor.java:155)
      2023-11-10T13:31:54.992621334Z     at io.debezium.pipeline.signal.SignalProcessor.executeWithSemaphore(SignalProcessor.java:165)
      2023-11-10T13:31:54.992684134Z     at io.debezium.pipeline.signal.SignalProcessor.processSourceSignal(SignalProcessor.java:149)
      2023-11-10T13:31:54.992747534Z     at io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:283)
      2023-11-10T13:31:54.992815135Z     at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:79)
      2023-11-10T13:31:54.992880135Z     at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:47)
      2023-11-10T13:31:54.992943635Z     at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:94)
      2023-11-10T13:31:54.993006835Z     at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:264)
      2023-11-10T13:31:54.993074436Z     at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processReplicationMessages(PostgresStreamingChangeEventSource.java:319)
      2023-11-10T13:31:54.993231936Z     at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.lambda$processMessages$0(PostgresStreamingChangeEventSource.java:221)
      2023-11-10T13:31:54.993297437Z     at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.decodeInsert(PgOutputMessageDecoder.java:432)
      2023-11-10T13:31:54.993364637Z     at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:208)
      2023-11-10T13:31:54.993430137Z     at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:41)
      2023-11-10T13:31:54.997494253Z     at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:637)
      2023-11-10T13:31:54.997656753Z     at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:629)
      2023-11-10T13:31:54.997724254Z     at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:221)
      2023-11-10T13:31:54.997787954Z     at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:183)
      2023-11-10T13:31:54.997869454Z     at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:42)
      2023-11-10T13:31:54.997935555Z     at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:272)
      2023-11-10T13:31:54.997999455Z     at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:197)
      2023-11-10T13:31:54.998064655Z     at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:137)
      2023-11-10T13:31:54.998136355Z     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      2023-11-10T13:31:54.998202455Z     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      2023-11-10T13:31:54.998264656Z     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      2023-11-10T13:31:54.998329256Z     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      2023-11-10T13:31:54.998393156Z     at java.base/java.lang.Thread.run(Thread.java:829)
      2023-11-10T13:31:54.998456856Z Caused by: org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block
      2023-11-10T13:31:54.998611357Z     at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
      2023-11-10T13:31:54.998683258Z     at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
      2023-11-10T13:31:54.998746358Z     at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:368)
      2023-11-10T13:31:54.998808758Z     at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:498)
      2023-11-10T13:31:54.998876658Z     at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:415)
      2023-11-10T13:31:54.998940558Z     at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:335)
      2023-11-10T13:31:54.999199659Z     at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:321)
      2023-11-10T13:31:54.999321960Z     at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:297)
      2023-11-10T13:31:54.999386160Z     at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:246)
      2023-11-10T13:31:54.999448860Z     at org.postgresql.jdbc.PgDatabaseMetaData.getTables(PgDatabaseMetaData.java:1347)
      2023-11-10T13:31:54.999512060Z     at io.debezium.jdbc.JdbcConnection.readSchema(JdbcConnection.java:1157)
      2023-11-10T13:31:54.999590261Z     at io.debezium.connector.postgresql.PostgresSchema.refresh(PostgresSchema.java:117)
      2023-11-10T13:31:54.999662861Z     at io.debezium.connector.postgresql.PostgresSignalBasedIncrementalSnapshotChangeEventSource.refreshTableSchema(PostgresSignalBasedIncrementalSnapshotChangeEventSource.java:57)
      2023-11-10T13:31:54.999728361Z     at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readChunk(AbstractIncrementalSnapshotChangeEventSource.java:352)
      2023-11-10T13:31:54.999803062Z     ... 42 more
      2023-11-10T13:31:54.999870862Z Caused by: org.postgresql.util.PSQLException: ERROR: column "********" does not exist
      2023-11-10T13:31:54.999934262Z   Position: 101
      2023-11-10T13:31:54.999997562Z     at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
      2023-11-10T13:31:55.000062263Z     at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
      2023-11-10T13:31:55.000124163Z     at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:368)
      2023-11-10T13:31:55.000196663Z     at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:498)
      2023-11-10T13:31:55.000265763Z     at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:415)
      2023-11-10T13:31:55.000334964Z     at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:335)
      2023-11-10T13:31:55.000398364Z     at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:321)
      2023-11-10T13:31:55.000462964Z     at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:297)
      2023-11-10T13:31:55.000538364Z     at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:246)
      2023-11-10T13:31:55.000600165Z     at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:626)
      2023-11-10T13:31:55.000666465Z     at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:495)
      2023-11-10T13:31:55.000729965Z     at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readChunk(AbstractIncrementalSnapshotChangeEventSource.java:355)
      2023-11-10T13:31:55.000798466Z     ... 42 more 

       

      How to reproduce the issue using our tutorial deployment?

      I used the docker-compose in https://github.com/debezium/debezium-examples/blob/main/tutorial/docker-compose-postgres.yaml as the basis for my local set-up.

      Let me know if you need me to provide my docker-compose set-up.

      It should be noted that I have omitted schema details as I am testing with a company-properiatary schema. However, the above issue can easily be reporoduced with any schema.

              Unassigned Unassigned
              mshtarbev Miroslav Shtarbev (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: