-
Bug
-
Resolution: Done
-
Major
-
2.1.1.Final
-
None
In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
Latest 2.1.1 Final docker image
Using Postgresql connector
What is the connector configuration?
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "topic.prefix": "db", "schema.include.list": "public", "plugin.name": "pgoutput" } }
What is the captured database version and mode of deployment?
Local docker deployment using the tutorial docker-compose.yaml
What behaviour do you expect?
Connector should be able to restart on an error like
tutorial-connect-1 | 2023-01-19 20:00:36,889 ERROR || WorkerSourceTask{id=inventory-connector-0} failed to send record to db.public.large_columns: [org.apache.kafka.connect.runtime.AbstractWorkerSourceTask]
tutorial-connect-1 | org.apache.kafka.common.errors.RecordTooLargeException: The message is 8002329 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
What behaviour do you see?
If I change Kafka connect / brokers settings to allow bigger messages :
tutorial-connect-1 | io.debezium.DebeziumException: Cannot seek to the last known offset '0/20EE148' on replication slot 'debezium'. Error from server: ERROR: cannot advance replication slot to 0/20EE148, minimum is 0/20EE2A8 tutorial-connect-1 | at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.validateSlotIsInExpectedState(PostgresReplicationConnection.java:376) tutorial-connect-1 | at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:328) tutorial-connect-1 | at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:137) tutorial-connect-1 | at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:41) tutorial-connect-1 | at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174) tutorial-connect-1 | at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141) tutorial-connect-1 | at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) tutorial-connect-1 | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) tutorial-connect-1 | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) tutorial-connect-1 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) tutorial-connect-1 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) tutorial-connect-1 | at java.base/java.lang.Thread.run(Thread.java:829)
Do you have the connector logs, ideally from start till finish?
First error (RecordTooLargeException) :
tutorial-connect-1 | 2023-01-19 20:00:36,852 INFO || 1 records sent during previous 00:02:41.968, last recorded offset of {server=db} partition is {transaction_id=null, lsn_proc=34628064, lsn_commit=34529960, lsn=34628064, txId=767, ts_usec=1674158436234989} [io.debezium.connector.common.BaseSourceTask] tutorial-connect-1 | 2023-01-19 20:00:36,889 ERROR || WorkerSourceTask{id=inventory-connector-0} failed to send record to db.public.large_columns: [org.apache.kafka.connect.runtime.AbstractWorkerSourceTask] tutorial-connect-1 | org.apache.kafka.common.errors.RecordTooLargeException: The message is 8002329 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. tutorial-connect-1 | 2023-01-19 20:00:41,896 ERROR || WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask] tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback tutorial-connect-1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:312) tutorial-connect-1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.prepareToPollTask(WorkerSourceTask.java:113) tutorial-connect-1 | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:342) tutorial-connect-1 | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) tutorial-connect-1 | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) tutorial-connect-1 | at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72) tutorial-connect-1 | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) tutorial-connect-1 | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) tutorial-connect-1 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) tutorial-connect-1 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) tutorial-connect-1 | at java.base/java.lang.Thread.run(Thread.java:829) tutorial-connect-1 | Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 8002329 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. tutorial-connect-1 | 2023-01-19 20:00:41,904 INFO || Stopping down connector [io.debezium.connector.common.BaseSourceTask]
2nd error
tutorial-connect-1 | 2023-01-19 20:04:48,983 INFO Postgres|db|postgres-connector-task Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/20EE2A8}, catalogXmin=762] [io.debezium.connector.postgres ql.connection.PostgresConnection] tutorial-connect-1 | 2023-01-19 20:04:48,984 INFO Postgres|db|postgres-connector-task Found previous offset PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[serve r='db'db='postgres', lsn=LSN{0/20EE148}, txId=766, lastCommitLsn=LSN{0/20EE148}, timestamp=2023-01-19T19:57:53.704983Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=LSN{0/20EE148}, lastCommitLsn =LSN{0/20EE148}, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [windowOpened=false, chunkEnd Position=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]] [io.debezium.connector.postgresql.PostgresConnectorTask] tutorial-connect-1 | 2023-01-19 20:04:48,985 INFO Postgres|db|postgres-connector-task Requested thread factory for connector PostgresConnector, id = db named = change-event-source-coordinator [io.debezium.util.Threads] tutorial-connect-1 | 2023-01-19 20:04:48,985 INFO Postgres|db|postgres-connector-task Creating thread debezium-postgresconnector-db-change-event-source-coordinator [io.debezium.util.Threads] tutorial-connect-1 | 2023-01-19 20:04:48,986 INFO || WorkerSourceTask{id=inventory-connector-0} Source task finished initialization and start [org.apache.kafka.connect.runtime.AbstractWorkerSourceTask] tutorial-connect-1 | 2023-01-19 20:04:48,986 INFO Postgres|db|snapshot Metrics registered [io.debezium.pipeline.ChangeEventSourceCoordinator] tutorial-connect-1 | 2023-01-19 20:04:48,986 INFO Postgres|db|snapshot Context created [io.debezium.pipeline.ChangeEventSourceCoordinator] tutorial-connect-1 | 2023-01-19 20:04:48,987 INFO Postgres|db|snapshot Previous snapshot has completed successfully, streaming logical changes from last known position [io.debezium.connector.postgresql.snapshot.InitialSnapshotter] tutorial-connect-1 | 2023-01-19 20:04:48,987 INFO Postgres|db|snapshot According to the connector configuration no snapshot will be executed [io.debezium.connector.postgresql.PostgresSnapshotChangeEventSource] tutorial-connect-1 | 2023-01-19 20:04:48,987 INFO Postgres|db|snapshot Snapshot ended with SnapshotResult [status=SKIPPED, offset=PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, source Info=source_info[server='db'db='postgres', lsn=LSN{0/20EE148}, txId=766, lastCommitLsn=LSN{0/20EE148}, timestamp=2023-01-19T19:57:53.704983Z, snapshot=FALSE, schema=, table=], lastSnapshotRecord=false, lastCompletelyProcessedLsn=LSN{0/2 0EE148}, lastCommitLsn=LSN{0/20EE148}, streamingStoppingLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], incrementalSnapshotContext=IncrementalSnapshotContext [window Opened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] [io.debezium.pipeline.ChangeEventSourceCoordinator] tutorial-connect-1 | 2023-01-19 20:04:48,987 INFO Postgres|db|streaming Connected metrics set to 'true' [io.debezium.pipeline.ChangeEventSourceCoordinator] tutorial-connect-1 | 2023-01-19 20:04:49,061 INFO Postgres|db|streaming REPLICA IDENTITY for 'public.large_columns' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns [io.debezium.connector.pos tgresql.PostgresSchema] tutorial-connect-1 | 2023-01-19 20:04:49,061 INFO Postgres|db|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator] tutorial-connect-1 | 2023-01-19 20:04:49,062 INFO Postgres|db|streaming Retrieved latest position from stored offset 'LSN{0/20EE148}' [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource] tutorial-connect-1 | 2023-01-19 20:04:49,062 INFO Postgres|db|streaming Looking for WAL restart position for last commit LSN 'LSN{0/20EE148}' and last change LSN 'LSN{0/20EE148}' [io.debezium.connector.postgresql.connection.WalPos itionLocator] tutorial-connect-1 | 2023-01-19 20:04:49,062 INFO Postgres|db|streaming Initializing PgOutput logical decoder publication [io.debezium.connector.postgresql.connection.PostgresReplicationConnection] tutorial-connect-1 | 2023-01-19 20:04:49,225 INFO Postgres|db|streaming Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/20EE2A8}, catalogXmin=762] [io.debezium.connector.postgresql.connection. PostgresConnection] tutorial-connect-1 | 2023-01-19 20:04:49,226 INFO Postgres|db|streaming Connection gracefully closed [io.debezium.jdbc.JdbcConnection] tutorial-connect-1 | 2023-01-19 20:04:49,227 INFO Postgres|db|streaming Seeking to LSN{0/20EE148} on the replication slot with command SELECT pg_replication_slot_advance('debezium', '0/20EE148') [io.debezium.connector.postgresql.c onnection.PostgresReplicationConnection] tutorial-connect-1 | 2023-01-19 20:04:49,234 ERROR Postgres|db|streaming Producer failure [io.debezium.pipeline.ErrorHandler] tutorial-connect-1 | io.debezium.DebeziumException: Cannot seek to the last known offset '0/20EE148' on replication slot 'debezium'. Error from server: ERROR: cannot advance replication slot to 0/20EE148, minimum is 0/20EE2A8 tutorial-connect-1 | at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.validateSlotIsInExpectedState(PostgresReplicationConnection.java:376) tutorial-connect-1 | at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:328) tutorial-connect-1 | at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:137) tutorial-connect-1 | at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:41) tutorial-connect-1 | at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174) tutorial-connect-1 | at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:141) tutorial-connect-1 | at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) tutorial-connect-1 | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) tutorial-connect-1 | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) tutorial-connect-1 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) tutorial-connect-1 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) tutorial-connect-1 | at java.base/java.lang.Thread.run(Thread.java:829) tutorial-connect-1 | 2023-01-19 20:04:49,235 INFO Postgres|db|streaming Connection gracefully closed [io.debezium.jdbc.JdbcConnection] tutorial-connect-1 | 2023-01-19 20:04:49,241 INFO Postgres|db|streaming Connection gracefully closed [io.debezium.jdbc.JdbcConnection] tutorial-connect-1 | 2023-01-19 20:04:49,241 INFO Postgres|db|streaming Finished streaming [io.debezium.pipeline.ChangeEventSourceCoordinator] tutorial-connect-1 | 2023-01-19 20:04:49,241 INFO Postgres|db|streaming Connected metrics set to 'false' [io.debezium.pipeline.ChangeEventSourceCoordinator] tutorial-connect-1 | 2023-01-19 20:04:49,488 ERROR || WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kaf ka.connect.runtime.WorkerTask] tutorial-connect-1 | org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
How to reproduce the issue using our tutorial deployment?
docker-compose up -d curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json postgres=# create table large_columns (a bigserial primary key, b text); CREATE TABLE postgres=# insert into large_columns (b) values (repeat('abcdefgh', 10)); INSERT 0 1
After Connect flush offsets :
➜ ~ kaf consume my_connect_offsets -f Key: [ "inventory-connector", { "server": "db" } ] Partition: 14 Offset: 0 Timestamp: 2023-01-19 20:58:07.277 +0100 CET { "lsn": 34529608, "lsn_proc": 34529608, "transaction_id": null, "ts_usec": 1674158273704983, "txId": 766 } postgres=# select slot_name, active, restart_lsn, restart_lsn - '0/0' as restart_lsn_int, confirmed_flush_lsn, confirmed_flush_lsn - '0/0' as confirmed_flush_lsn_int from pg_replication_slots ; ┌───────────┬────────┬─────────────┬─────────────────┬─────────────────────┬─────────────────────────┐ │ slot_name │ active │ restart_lsn │ restart_lsn_int │ confirmed_flush_lsn │ confirmed_flush_lsn_int │ ├───────────┼────────┼─────────────┼─────────────────┼─────────────────────┼─────────────────────────┤ │ debezium │ t │ 0/2070530 │ 34014512 │ 0/20EE148 │ 34529608 │ └───────────┴────────┴─────────────┴─────────────────┴─────────────────────┴─────────────────────────┘ (1 row)
Then insert a large message to trigger the RecordTooLarge Exception
postgres=# insert into large_columns (b) values (repeat('abcdefgh', 1000000)); INSERT 0 1 postgres=# select slot_name, active, restart_lsn, restart_lsn - '0/0' as restart_lsn_int, confirmed_flush_lsn, confirmed_flush_lsn - '0/0' as confirmed_flush_lsn_int from pg_replication_slots ; ┌───────────┬────────┬─────────────┬─────────────────┬─────────────────────┬─────────────────────────┐ │ slot_name │ active │ restart_lsn │ restart_lsn_int │ confirmed_flush_lsn │ confirmed_flush_lsn_int │ ├───────────┼────────┼─────────────┼─────────────────┼─────────────────────┼─────────────────────────┤ │ debezium │ f │ 0/2070530 │ 34014512 │ 0/20EE2A8 │ 34529960 │ └───────────┴────────┴─────────────┴─────────────────┴─────────────────────┴─────────────────────────┘
No message will be produced to the my_connect_offsets topic while the confirmed_flush_lsn advanced.
Maybe because the record was large and chunked by PG ?
curl -X POST localhost:8083/connectors/inventory-connector/tasks/0/restart
will lead to the issue where the LSN stored in the offset topic is BEHIND the one on the slot.