Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6026

Offsets are not flushed on connect offsets topic when encountering an error on Postgres connector

XMLWordPrintable

    • Critical

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      Latest 2.1.1 Final docker image

      Using Postgresql connector

      What is the connector configuration?

      {
          "name": "inventory-connector",
          "config": {
              "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
              "tasks.max": "1",
              "database.hostname": "postgres",
              "database.port": "5432",
              "database.user": "postgres",
              "database.password": "postgres",
              "database.dbname" : "postgres",
              "topic.prefix": "db",
              "schema.include.list": "public",
              "plugin.name": "pgoutput"
          }
      }

      What is the captured database version and mode of deployment?

      Local docker deployment using the tutorial docker-compose.yaml

      What behaviour do you expect?

      Connector should be able to restart on an error like 

      tutorial-connect-1  | 2023-01-19 20:00:36,889 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} failed to send record to db.public.large_columns:    [org.apache.kafka.connect.runtime.AbstractWorkerSourceTask]
      tutorial-connect-1  | org.apache.kafka.common.errors.RecordTooLargeException: The message is 8002329 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. 

      What behaviour do you see?

      If I change Kafka connect / brokers settings to allow bigger messages : 

      tutorial-connect-1  | io.debezium.DebeziumException: Cannot seek to the last known offset '0/20EE148' on replication slot 'debezium'. Error from server: ERROR: cannot advance replication slot to 0/20EE148, minimum is 0/20EE2A8
      tutorial-connect-1  |   at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.validateSlotIsInExpectedState(PostgresReplicationConnection.java:376)
      tutorial-connect-1  |   at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:328)
      tutorial-connect-1  |   at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:137)
      tutorial-connect-1  |   at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:41)
      tutorial-connect-1  |   at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174)
      tutorial-connect-1  |   at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141)
      tutorial-connect-1  |   at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
      tutorial-connect-1  |   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      tutorial-connect-1  |   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      tutorial-connect-1  |   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      tutorial-connect-1  |   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      tutorial-connect-1  |   at java.base/java.lang.Thread.run(Thread.java:829) 

       

      Do you have the connector logs, ideally from start till finish?

      First error (RecordTooLargeException) : 

      tutorial-connect-1  | 2023-01-19 20:00:36,852 INFO   ||  1 records sent during previous 00:02:41.968, last recorded offset of {server=db} partition is {transaction_id=null, lsn_proc=34628064, lsn_commit=34529960, lsn=34628064, txId=767,
       ts_usec=1674158436234989}   [io.debezium.connector.common.BaseSourceTask]
      tutorial-connect-1  | 2023-01-19 20:00:36,889 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} failed to send record to db.public.large_columns:    [org.apache.kafka.connect.runtime.AbstractWorkerSourceTask]
      tutorial-connect-1  | org.apache.kafka.common.errors.RecordTooLargeException: The message is 8002329 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
      tutorial-connect-1  | 2023-01-19 20:00:41,896 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
      tutorial-connect-1  | org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
      tutorial-connect-1  |   at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:312)
      tutorial-connect-1  |   at org.apache.kafka.connect.runtime.WorkerSourceTask.prepareToPollTask(WorkerSourceTask.java:113)
      tutorial-connect-1  |   at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:342)
      tutorial-connect-1  |   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
      tutorial-connect-1  |   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
      tutorial-connect-1  |   at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)
      tutorial-connect-1  |   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      tutorial-connect-1  |   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      tutorial-connect-1  |   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      tutorial-connect-1  |   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      tutorial-connect-1  |   at java.base/java.lang.Thread.run(Thread.java:829)
      tutorial-connect-1  | Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 8002329 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
      tutorial-connect-1  | 2023-01-19 20:00:41,904 INFO   ||  Stopping down connector   [io.debezium.connector.common.BaseSourceTask] 

      2nd error 

      tutorial-connect-1  | 2023-01-19 20:04:48,983 INFO   Postgres|db|postgres-connector-task  Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/20EE2A8}, catalogXmin=762]   [io.debezium.connector.postgres
      ql.connection.PostgresConnection]
      tutorial-connect-1  | 2023-01-19 20:04:48,984 INFO   Postgres|db|postgres-connector-task  Found previous offset PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[serve
      r='db'db='postgres', lsn=LSN{0/20EE148}, txId=766, lastCommitLsn=LSN{0/20EE148}, timestamp=2023-01-19T19:57:53.704983Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=LSN{0/20EE148}, lastCommitLsn
      =LSN{0/20EE148}, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEnd
      Position=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]   [io.debezium.connector.postgresql.PostgresConnectorTask]
      tutorial-connect-1  | 2023-01-19 20:04:48,985 INFO   Postgres|db|postgres-connector-task  Requested thread factory for connector PostgresConnector, id = db named = change-event-source-coordinator   [io.debezium.util.Threads]
      tutorial-connect-1  | 2023-01-19 20:04:48,985 INFO   Postgres|db|postgres-connector-task  Creating thread debezium-postgresconnector-db-change-event-source-coordinator   [io.debezium.util.Threads]
      tutorial-connect-1  | 2023-01-19 20:04:48,986 INFO   ||  WorkerSourceTask{id=inventory-connector-0} Source task finished initialization and start   [org.apache.kafka.connect.runtime.AbstractWorkerSourceTask]
      tutorial-connect-1  | 2023-01-19 20:04:48,986 INFO   Postgres|db|snapshot  Metrics registered   [io.debezium.pipeline.ChangeEventSourceCoordinator]
      tutorial-connect-1  | 2023-01-19 20:04:48,986 INFO   Postgres|db|snapshot  Context created   [io.debezium.pipeline.ChangeEventSourceCoordinator]
      tutorial-connect-1  | 2023-01-19 20:04:48,987 INFO   Postgres|db|snapshot  Previous snapshot has completed successfully, streaming logical changes from last known position   [io.debezium.connector.postgresql.snapshot.InitialSnapshotter]
      tutorial-connect-1  | 2023-01-19 20:04:48,987 INFO   Postgres|db|snapshot  According to the connector configuration no snapshot will be executed   [io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource]
      tutorial-connect-1  | 2023-01-19 20:04:48,987 INFO   Postgres|db|snapshot  Snapshot ended with SnapshotResult [status=SKIPPED, offset=PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, source
      Info=source_info[server='db'db='postgres', lsn=LSN{0/20EE148}, txId=766, lastCommitLsn=LSN{0/20EE148}, timestamp=2023-01-19T19:57:53.704983Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=LSN{0/2
      0EE148}, lastCommitLsn=LSN{0/20EE148}, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [window
      Opened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]]   [io.debezium.pipeline.ChangeEventSourceCoordinator]
      tutorial-connect-1  | 2023-01-19 20:04:48,987 INFO   Postgres|db|streaming  Connected metrics set to 'true'   [io.debezium.pipeline.ChangeEventSourceCoordinator]
      tutorial-connect-1  | 2023-01-19 20:04:49,061 INFO   Postgres|db|streaming  REPLICA IDENTITY for 'public.large_columns' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns   [io.debezium.connector.pos
      tgresql.PostgresSchema]
      tutorial-connect-1  | 2023-01-19 20:04:49,061 INFO   Postgres|db|streaming  Starting streaming   [io.debezium.pipeline.ChangeEventSourceCoordinator]
      tutorial-connect-1  | 2023-01-19 20:04:49,062 INFO   Postgres|db|streaming  Retrieved latest position from stored offset 'LSN{0/20EE148}'   [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]
      tutorial-connect-1  | 2023-01-19 20:04:49,062 INFO   Postgres|db|streaming  Looking for WAL restart position for last commit LSN 'LSN{0/20EE148}' and last change LSN 'LSN{0/20EE148}'   [io.debezium.connector.postgresql.connection.WalPos
      itionLocator]
      tutorial-connect-1  | 2023-01-19 20:04:49,062 INFO   Postgres|db|streaming  Initializing PgOutput logical decoder publication   [io.debezium.connector.postgresql.connection.PostgresReplicationConnection]
      tutorial-connect-1  | 2023-01-19 20:04:49,225 INFO   Postgres|db|streaming  Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/20EE2A8}, catalogXmin=762]   [io.debezium.connector.postgresql.connection.
      PostgresConnection]
      tutorial-connect-1  | 2023-01-19 20:04:49,226 INFO   Postgres|db|streaming  Connection gracefully closed   [io.debezium.jdbc.JdbcConnection]
      tutorial-connect-1  | 2023-01-19 20:04:49,227 INFO   Postgres|db|streaming  Seeking to LSN{0/20EE148} on the replication slot with command SELECT pg_replication_slot_advance('debezium', '0/20EE148')   [io.debezium.connector.postgresql.c
      onnection.PostgresReplicationConnection]
      tutorial-connect-1  | 2023-01-19 20:04:49,234 ERROR  Postgres|db|streaming  Producer failure   [io.debezium.pipeline.ErrorHandler]
      tutorial-connect-1  | io.debezium.DebeziumException: Cannot seek to the last known offset '0/20EE148' on replication slot 'debezium'. Error from server: ERROR: cannot advance replication slot to 0/20EE148, minimum is 0/20EE2A8
      tutorial-connect-1  |   at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.validateSlotIsInExpectedState(PostgresReplicationConnection.java:376)
      tutorial-connect-1  |   at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:328)
      tutorial-connect-1  |   at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:137)
      tutorial-connect-1  |   at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:41)
      tutorial-connect-1  |   at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174)
      tutorial-connect-1  |   at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141)
      tutorial-connect-1  |   at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
      tutorial-connect-1  |   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      tutorial-connect-1  |   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
      tutorial-connect-1  |   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      tutorial-connect-1  |   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      tutorial-connect-1  |   at java.base/java.lang.Thread.run(Thread.java:829)
      tutorial-connect-1  | 2023-01-19 20:04:49,235 INFO   Postgres|db|streaming  Connection gracefully closed   [io.debezium.jdbc.JdbcConnection]
      tutorial-connect-1  | 2023-01-19 20:04:49,241 INFO   Postgres|db|streaming  Connection gracefully closed   [io.debezium.jdbc.JdbcConnection]
      tutorial-connect-1  | 2023-01-19 20:04:49,241 INFO   Postgres|db|streaming  Finished streaming   [io.debezium.pipeline.ChangeEventSourceCoordinator]
      tutorial-connect-1  | 2023-01-19 20:04:49,241 INFO   Postgres|db|streaming  Connected metrics set to 'false'   [io.debezium.pipeline.ChangeEventSourceCoordinator]
      tutorial-connect-1  | 2023-01-19 20:04:49,488 ERROR  ||  WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kaf
      ka.connect.runtime.WorkerTask]
      tutorial-connect-1  | org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.

      How to reproduce the issue using our tutorial deployment?

      docker-compose up -d 
      
      curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
      
      postgres=# create table large_columns (a bigserial primary key, b text);
      CREATE TABLE
      postgres=# insert into large_columns (b) values (repeat('abcdefgh', 10));
      INSERT 0 1
      
      

      After Connect flush offsets : 

      ➜  ~ kaf consume my_connect_offsets -f
      Key:         [ "inventory-connector", { "server": "db" } ]
      Partition:   14
      Offset:      0
      Timestamp:   2023-01-19 20:58:07.277 +0100 CET
      {
        "lsn": 34529608,
        "lsn_proc": 34529608,
        "transaction_id": null,
        "ts_usec": 1674158273704983,
        "txId": 766
      } 
      
      postgres=# select slot_name, active, restart_lsn, restart_lsn - '0/0' as restart_lsn_int, confirmed_flush_lsn, confirmed_flush_lsn - '0/0' as confirmed_flush_lsn_int from pg_replication_slots ; ┌───────────┬────────┬─────────────┬─────────────────┬─────────────────────┬─────────────────────────┐ │ slot_name │ active │ restart_lsn │ restart_lsn_int │ confirmed_flush_lsn │ confirmed_flush_lsn_int │ ├───────────┼────────┼─────────────┼─────────────────┼─────────────────────┼─────────────────────────┤ │ debezium  │ t      │ 0/2070530   │        34014512 │ 0/20EE148           │                34529608 │ └───────────┴────────┴─────────────┴─────────────────┴─────────────────────┴─────────────────────────┘ (1 row)

      Then insert a large message to trigger the RecordTooLarge Exception

      postgres=# insert into large_columns (b) values (repeat('abcdefgh', 1000000));
      INSERT 0 1
      
      postgres=# select slot_name, active, restart_lsn, restart_lsn - '0/0' as restart_lsn_int, confirmed_flush_lsn, confirmed_flush_lsn - '0/0' as confirmed_flush_lsn_int from pg_replication_slots ;
      ┌───────────┬────────┬─────────────┬─────────────────┬─────────────────────┬─────────────────────────┐
      │ slot_name │ active │ restart_lsn │ restart_lsn_int │ confirmed_flush_lsn │ confirmed_flush_lsn_int │
      ├───────────┼────────┼─────────────┼─────────────────┼─────────────────────┼─────────────────────────┤
      │ debezium  │ f      │ 0/2070530   │        34014512 │ 0/20EE2A8           │                34529960 │
      └───────────┴────────┴─────────────┴─────────────────┴─────────────────────┴─────────────────────────┘ 

      No message will be produced to the my_connect_offsets topic while the confirmed_flush_lsn advanced.

      Maybe because the record was large and chunked by PG ?

      curl -X POST localhost:8083/connectors/inventory-connector/tasks/0/restart 

      will lead to the issue where the LSN stored in the offset topic is BEHIND the one on the slot.

       

              Unassigned Unassigned
              fuyar UYAR Farid
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: