Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8048

Debezium Postgres connector fails on incremental snapshot when CloudEventsConnector is used

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Unresolved
    • Icon: Major Major
    • 3.0.0.CR1
    • None
    • postgresql-connector
    • None
    • False
    • None
    • False

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      debezium-connector-postgres 2.6.2.Final

      What is the connector configuration?

        class: io.debezium.connector.postgresql.PostgresConnector
        tasksMax: 1
        config:
          tasks.max: 1
          database.hostname: 'pgdbdev.vpn.servicetitan.com'
          database.port: '5432'
          database.user: 'dataplatform_ro'
          database.password: '${file://opt/kafka/connect-password/debezium-secret/connectorConfig:pgDatabasePassword}'
          database.dbname: 'fleet_alert'
          database.server.name: 'pgdbdev'
          table.include.list: 'public.debezium_signal,fleet.Recipient,fleet.AlertType,fleet.Alert'
          slot.name: 'debezium_fleet_alert'
          plugin.name: 'pgoutput'
          publication.name: 'dbz_publication'
          topic.prefix: 'alert'
          transforms: 'Reroute'
          transforms.Reroute.type: 'io.debezium.transforms.ByLogicalTableRouter'
          transforms.Reroute.topic.regex: '.*'
          transforms.Reroute.topic.replacement: 'debezium_test_fleet_alert'
          key.converter: 'org.apache.kafka.connect.storage.StringConverter'
          key.converter.schemas.enable: false
          value.converter: 'io.debezium.converters.CloudEventsConverter'
          value.converter.serializer.type: 'json'
          value.converter.schemas.enable: false
          include.schema.changes: true
          skipped.operations: 'none'
          signal.data.collection: 'public.debezium_signal'
       
       
      (the tables list is trimmed)

      What is the captured database version and mode of deployment?

      On-premises "PostgreSQL 15.4 (Ubuntu 15.4-1.pgdg22.04+1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0, 64-bit"

      The issue also reproduces for Azure Postgres Flexible Server

      What behavior do you expect?

      After triggering incremental snapshot for specific table via the signal table Debezium should perform snapshot and keep on working

      insert into public.debezium_signal
      values ('0', 'execute-snapshot', '{"type": "incremental", "data-collections": ["alert.\"AlertType\""]}')

      What behavior do you see?

      The connector crushes and does not start until the value converter changed to 
      org.apache.kafka.connect.json.JsonConverter.

      Do you see the same behaviour using the latest released Debezium version?

      yes for non Alpha/Beta/CR version

      Do you have the connector logs, ideally from start till finish?

      Error log:

      2024-07-11 08:01:32,627 INFO [debezium-connector-fleet-alert|task-0] Requested 'INCREMENTAL' snapshot of data collections '[fleet."AlertType"]' with additional conditions '[]' and surrogate key 'PK of table will be used' (io.debezium.pipeline.signal.actions.snapshotting.ExecuteSnapshot) [debezium-postgresconnector-fleet-alert-change-event-source-coordinator]
      2024-07-11 08:01:32,643 INFO [debezium-connector-fleet-alert|task-0] Incremental snapshot for table 'fleet.AlertType' will end at position [19] (io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource) [debezium-postgresconnector-fleet-alert-change-event-source-coordinator]
      2024-07-11 08:01:32,652 INFO [debezium-connector-fleet-alert|task-0] No data returned by the query, incremental snapshotting of table 'fleet.AlertType' finished (io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource) [debezium-postgresconnector-fleet-alert-change-event-source-coordinator]
      2024-07-11 08:01:32,655 INFO [debezium-connector-fleet-alert|task-0] Skipping read chunk because snapshot is not running (io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource) [debezium-postgresconnector-fleet-alert-change-event-source-coordinator]
      2024-07-11 08:01:33,059 WARN [debezium-connector-fleet-alert|task-0] The Kafka Connect schema name 'fleet-alert.fleet_alert.CloudEvents.Envelope' is not a valid Avro schema name, so replacing with 'fleet_alert.fleet_alert.CloudEvents.Envelope' (io.debezium.schema.SchemaNameAdjuster) [task-thread-debezium-connector-fleet-alert-0]
      2024-07-11 08:01:33,059 WARN [debezium-connector-fleet-alert|task-0] The Kafka Connect schema name 'fleet-alert.fleet_alert.CloudEvents.Envelope' is not a valid Avro schema name, so replacing with 'fleet_alert.fleet_alert.CloudEvents.Envelope' (io.debezium.schema.SchemaNameAdjuster) [task-thread-debezium-connector-fleet-alert-0]
      2024-07-11 08:01:33,059 WARN [debezium-connector-fleet-alert|task-0] The Kafka Connect schema name 'fleet-alert.fleet_alert.CloudEvents.Envelope' is not a valid Avro schema name, so replacing with 'fleet_alert.fleet_alert.CloudEvents.Envelope' (io.debezium.schema.SchemaNameAdjuster) [task-thread-debezium-connector-fleet-alert-0]
      2024-07-11 08:01:33,061 INFO [debezium-connector-fleet-alert|task-0] WorkerSourceTask{id=debezium-connector-fleet-alert-0} Committing offsets for 2 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask) [task-thread-debezium-connector-fleet-alert-0]
      2024-07-11 08:01:33,066 ERROR [debezium-connector-fleet-alert|task-0] WorkerSourceTask{id=debezium-connector-fleet-alert-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-debezium-connector-fleet-alert-0]
      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
          at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:494)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:402)
          at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)

       

      full log after trying to restart the connector is attached

            Unassigned Unassigned
            mdubovik78 Mikhail Dubovik (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated: