Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8150

Issue with Debezium Snapshot: DateTimeParseException with plugin pgoutput

XMLWordPrintable

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

       
      I'm encountering an issue with Debezium version 2.3.1 while processing a snapshot from a PostgreSQL database. The error occurs during the snapshot phase and seems to be related to parsing a date/time value. The specific error message is:

      Caused by: org.apache.kafka.connect.errors.ConnectException:     
      java.time.format.DateTimeParseException: Text 'f' could not be parsed at index 0
              at io.debezium.connector.postgresql.connection.DateTimeFormat$ISODateTimeFormat.format(DateTimeFormat.java:166)
              at io.debezium.connector.postgresql.connection.DateTimeFormat$ISODateTimeFormat.timestampToInstant(DateTimeFormat.java:172)
              at io.debezium.connector.postgresql.connection.AbstractColumnValue.asInstant(AbstractColumnValue.java:81)
              at io.debezium.connector.postgresql.connection.ReplicationMessageColumnValueResolver.resolveValue(ReplicationMessageColumnValueResolver.java:110)
              at io.debezium.connector.postgresql.connection.pgoutput.PgOutputReplicationMessage.getValue(PgOutputReplicationMessage.java:92)
              at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder$1.getValue(PgOutputMessageDecoder.java:748)
              at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.columnValues(PostgresChangeRecordEmitter.java:179)
              at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.getNewColumnValues(PostgresChangeRecordEmitter.java:125)
              at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:69)
              at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:47)
              at io.debezium.connector.postgresql.PostgresChangeRecordEmitter.emitChangeRecords(PostgresChangeRecordEmitter.java:94)
              at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:296)
              ... 17 more
      Caused by: java.time.format.DateTimeParseException: Text 'f' could not be parsed at index 0
              at java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2046)
              at java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1874)
              at io.debezium.connector.postgresql.connection.DateTimeFormat$ISODateTimeFormat.lambda$timestampToInstant$3(DateTimeFormat.java:172)
              at io.debezium.connector.postgresql.connection.DateTimeFormat$ISODateTimeFormat.format(DateTimeFormat.java:162)

      I'm testing with tutorial using export DEBEZIUM_VERSION=2.7 with standard connector and plugin decoderbufs  and I didn't get this issue, so I suspect that the error is due to some functionality linked to pgoutput.

      What Debezium connector do you use and what version?

      Debezium PostgreSQL connector, version 2.3.1.

      What is the connector configuration?

      {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "connector.displayName": "PostgreSQL",
        "database.user": "docker",
        "database.dbname": "exampledb",
        "transforms": "unwrap",
        "database.server.name": "localhost",
        "heartbeat.interval.ms": "60000",
        "database.port": "5432",
        "plugin.name": "pgoutput",
        "slot.max.retries": "10",
        "schema.include.list": "public",
        "slot.retry.delay.ms": "15000",
        "heartbeat.action.query": "INSERT INTO public.debezium_heartbeat VALUES ('debezium', now())",
        "decimal.handling.mode": "string",
        "database.hostname": "postgres",
        "database.password": "docker",
        "transforms.unwrap.drop.tombstones": "false",
        "signal.data.collection": "public.debezium_signal",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "table.include.list": "public.table, public.debezium_signal",
        "max.batche.size": "65000",
        "max.queue.size": "275000",
        "incremental.snapshot.chunk.size": "6500",
        "connector.id": "postgres",
        "topic.prefix": "db-prod"
      } 

       

       

      What is the captured database version and mode of deployment?

      PostgreSQL: 13.15, deployed on GKE 

      The connector is configured with the pgoutput plugin for capturing changes. 

      What behavior do you expect?

      I expect Debezium to correctly capture and process changes from the PostgreSQL database, including snapshot and subsequent updates, without encountering parsing errors.

      What behavior do you see?

      During the snapshot phase, Debezium encounters a parsing error related to date/time values. Specifically, the error is a DateTimeParseException, indicating that Debezium is attempting to parse an invalid date/time format ('f').

      Do you see the same behaviour using the latest released Debezium version?

      Yes. I tried updating debezium to 2.7.0.Final, 2.7.1.Final and 3.0.0.Alpha2

      Do you have the connector logs, ideally from start till finish?

      When I started snapshot I received this payload:

       

      {{}}

      { "payload": { "id": 28948709, "federal_tax_id": "XXX.XXX.XXX-XX", "external_id": "extXXXXXX", "first_name": "Nome da Empresa", "last_name": "N/A", "email": "contato@dominio.com", "phone": "+55 XX XXXXX-XXXX", "cellphone": "+55 XX XXXXX-XXXX", "address_street": "Rua Exemplo", "address_street_number": "XX", "address_complement": "Complemento Exemplo", "address_city_district": "Bairro Exemplo", "address_post_code": "XXXXX-XXX", "address_city": "Cidade Exemplo", "address_city_code": "XXXX", "address_state_code": "XX", "address_country": "BR", "address_latitude": "-XX.XXXX", "address_longitude": "-XX.XXXX", "address_geo": { "wkb": "AQEAACDmEAAARUdy+Q9RR8CwcmiR7Yw3wA==", "srid": 4326 }, "created_at": 1722989773163, "updated_at": 1723386966400, "is_company": true, "state_tax_id": "XX123456789", "official_name": "Nome da Empresa" } }

      {{}}

      When I did a update on table I receveid this payload:

      {{}}

      { "payload": { "id": 28948707, "federal_tax_id": "XXX.XXX.XXX-XX", "external_id": "extXXXXXX", "first_name": "Nome", "last_name": "Sobrenome", "email": "usuario@dominio.com", "phone": "+55 XX XXXXX-XXXX", "cellphone": "+55 XX XXXXX-XXXX", "address_street": "Avenida Exemplo", "address_street_number": "XX", "address_complement": "Sala XXX", "address_city_district": "Bairro Exemplo", "address_post_code": "XXXXX-XXX", "address_city": "Cidade Exemplo", "address_city_code": "XXXX", "address_state_code": "XX", "address_country": "BR", "address_latitude": "-XX.XXXX", "address_longitude": "-XX.XXXX", "created_at": 1722990056975, "updated_at": 1723406879566, "is_company": false, "state_tax_id": "XX987654321", "official_name": "Nome Sobrenome" } }

      {{}}

      When I got an error, I checked the logs and found that the columns received wrong changed values:

      
       {{2024-08-12 10:03:13,233 TRACE  Postgres|company-db-prod|streaming  Column: address_latitude(numeric)=-22.9068   [io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder]
      2024-08-12 10:03:13,233 TRACE  Postgres|company-db-prod|streaming  Column: address_longitude(numeric)=-43.1729   [io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder]
      2024-08-12 10:03:13,233 TRACE  Postgres|company-db-prod|streaming  Column: address_geo(geometry)=2024-08-07 00:20:56.975735   [io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder]
      }}

      How to reproduce the issue using our tutorial deployment?

      if you want to reproduce the scenario, follow the commands using tutorial setup:

       

      # Terminal 1 - start the deployment
      # Start the deployment
      export DEBEZIUM_VERSION=2.7
      docker-compose -f docker-compose-postgres.yaml up
      # Terminal 2
      # Create a signalling table
      echo "CREATE TABLE inventory.dbz_signal (id varchar(64), type varchar(32), data varchar(2048))" | docker-compose -f docker-compose-postgres.yaml exec -T postgres env PGOPTIONS="--search_path=inventory" bash -c "psql -U $POSTGRES_USER postgres"
      # Start Postgres connector, capture only customers table and enable signalling
      curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @- <<EOF
      {
          "name": "inventory-connector",
          "config": {
              "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
              "tasks.max": "1",
              "database.hostname": "postgres",
              "database.port": "5432",
              "database.user": "postgres",
              "database.password": "postgres",
              "database.dbname": "postgres",
              "database.server.name": "dbserver1",
              "schema.include": "inventory",
              "table.include.list": "inventory.customers,inventory.dbz_signal",
              "signal.data.collection": "inventory.dbz_signal",
              "topic.prefix": "dbserver1" ,
              "plugin.name": "pgoutput"
          }
      }
      EOF
      # Terminal 2
      # Create a inventory.table_test
      echo "
      DROP EXTENSION IF EXISTS postgis;
      SET search_path TO inventory;
      CREATE EXTENSION IF NOT EXISTS postgis;
      CREATE SEQUENCE IF NOT EXISTS inventory.table_test_id_seq;
      CREATE TABLE IF NOT EXISTS inventory.table_test (
          id bigint NOT NULL DEFAULT nextval('inventory.table_test_id_seq'::regclass),
          federal_tax_id character varying(255) COLLATE pg_catalog.\"default\",
          external_id character varying(255) COLLATE pg_catalog.\"default\",
          first_name character varying(255) COLLATE pg_catalog.\"default\",
          last_name character varying(255) COLLATE pg_catalog.\"default\",
          email character varying(255) COLLATE pg_catalog.\"default\",
          phone character varying(255) COLLATE pg_catalog.\"default\",
          cellphone character varying(255) COLLATE pg_catalog.\"default\",
          address_street character varying(255) COLLATE pg_catalog.\"default\",
          address_street_number character varying(255) COLLATE pg_catalog.\"default\",
          address_complement character varying(255) COLLATE pg_catalog.\"default\",
          address_city_district character varying(255) COLLATE pg_catalog.\"default\",
          address_post_code character varying(255) COLLATE pg_catalog.\"default\",
          address_city character varying(255) COLLATE pg_catalog.\"default\",
          address_city_code character varying(255) COLLATE pg_catalog.\"default\",
          address_state_code character varying(255) COLLATE pg_catalog.\"default\",
          address_country character varying(255) COLLATE pg_catalog.\"default\",
          address_latitude numeric,
          address_longitude numeric,
          address_geo geometry(Point,4326) GENERATED ALWAYS AS (st_setsrid(st_makepoint((address_longitude)::double precision, (address_latitude)::double precision), 4326)) STORED,
          created_at timestamp without time zone NOT NULL DEFAULT now(),
          updated_at timestamp without time zone NOT NULL DEFAULT now(),
          is_company boolean NOT NULL DEFAULT false,
          state_tax_id character varying(255) COLLATE pg_catalog.\"default\",
          official_name character varying(255) COLLATE pg_catalog.\"default\",
          CONSTRAINT table_test_pkey PRIMARY KEY (id)
      );" | docker-compose -f docker-compose-postgres.yaml exec -T postgres bash -c "psql -U \$POSTGRES_USER -d postgres"
      # TERMINAL 2
      # Insert fake data into inventory.table_test
      echo "
      INSERT INTO inventory.table_test (federal_tax_id, external_id, first_name, last_name, email, phone, cellphone, address_street, address_street_number, address_complement, address_city_district, address_post_code, address_city, address_city_code, address_state_code, address_country, address_latitude, address_longitude, created_at, updated_at, is_company, state_tax_id, official_name)
      VALUES
      ('12345678901', 'EXT001', 'Luke', 'Skywalker', 'luke.skywalker@example.com', '555-1234', '555-5678', '123 Force Lane', '1', 'Apt 1', 'Jedi District', '12345', 'Tatooine', 'TAT', 'TAT', 'Tatooine', 34.0522, -118.2437, now(), now(), false, 'TAT98765', 'Jedi Order'),
      ('98765432109', 'EXT002', 'Leia', 'Organa', 'leia.organa@example.com', '555-8765', '555-4321', '456 Rebel Ave', '2', 'Suite 2', 'Rebel District', '54321', 'Alderaan', 'ALD', 'ALD', 'Alderaan', 40.7128, -74.0060, now(), now(), false, 'ALD54321', 'Rebel Alliance'),
      ('12398745601', 'EXT003', 'Han', 'Solo', 'han.solo@example.com', '555-3456', '555-6789', '789 Smuggler Road', '3', 'Floor 3', 'Smuggler District', '67890', 'Corellia', 'COR', 'COR', 'Corellia', 37.7749, -122.4194, now(), now(), false, 'COR12345', 'Millennium Falcon Co.'),
      ('32165498702', 'EXT004', 'Obi-Wan', 'Kenobi', 'obi.wan.kenobi@example.com', '555-9876', '555-5432', '101 Jedi Temple', '4', 'Unit 4', 'Temple District', '24680', 'Kamino', 'KAM', 'KAM', 'Kamino', 22.3964, 114.1095, now(), now(), false, 'KAM24680', 'Jedi Council'),
      ('65432109876', 'EXT005', 'Yoda', 'Master', 'yoda.master@example.com', '555-6543', '555-2109', '202 Green Swamp', '5', 'Green House', 'Swamp District', '13579', 'Dagobah', 'DAG', 'DAG', 'Dagobah', -21.2860, 149.1261, now(), now(), false, 'DAG13579', 'Jedi Masters Inc.')
      ;" | docker-compose -f docker-compose-postgres.yaml exec -T postgres bash -c "psql -U \$POSTGRES_USER -d postgres"
      # TERMINAL 2
      #add table inventory.tabela_test in table.include.list
      curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/inventory-connector/config -d @- <<EOF
      {
          "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
          "tasks.max": "1",
          "database.hostname": "postgres",
          "database.port": "5432",
          "database.user": "postgres",
          "database.password": "postgres",
          "database.dbname" : "postgres",
          "database.server.name": "dbserver1",
          "schema.include": "inventory",
          "table.include.list": "inventory.customers,inventory.dbz_signal,inventory.table_test ",
          "signal.data.collection": "inventory.dbz_signal",
           "topic.prefix": "dbserver1",
         "plugin.name": "pgoutput"
      }
      EOF
      # TERMINAL 2
      # Start incremental snapshot
      echo "INSERT INTO inventory.dbz_signal VALUES ('signal-1', 'execute-snapshot', '{\"data-collections\": [\"inventory.table_test\"]}')" | docker-compose -f docker-compose-postgres.yaml exec -T postgres env PGOPTIONS="--search_path=inventory" bash -c "psql -U $POSTGRES_USER postgres"
      # Terminal 3
      # update into inventory.tabela_test in the same time of snapshot
      echo "
      UPDATE inventory.table_test
      SET updated_at = current_timestamp
      WHERE federal_tax_id = '12345678901';
      " | docker-compose -f docker-compose-postgres.yaml exec -T postgres bash -c "psql -U \$POSTGRES_USER -d postgres"
      #OR
      #insert into inventory.table_test in the same time of snapshot
      echo "
      INSERT INTO inventory.table_test (federal_tax_id, external_id, first_name, last_name, email, phone, cellphone, address_street, address_street_number, address_complement, address_city_district, address_post_code, address_city, address_city_code, address_state_code, address_country, address_latitude, address_longitude, created_at, updated_at, is_company, state_tax_id, official_name)
      VALUES
      ('11122334455', 'EXT006', 'Mace', 'Windu', 'mace.windu@example.com', '555-6789', '555-4321', '789 Jedi Way', '6', 'Unit 6', 'Jedi District', '13579', 'Geonosis', 'GEO', 'GEO', 'Geonosis', -10.2860, 123.4567, now(), now(), false, 'GEO12345', 'Jedi Order'),
      ('22233445566', 'EXT007', 'Padmé', 'Amidala', 'padme.amidala@example.com', '555-5432', '555-8765', '456 Naboo Lane', '7', 'Suite 7', 'Naboo District', '24680', 'Naboo', 'NAB', 'NAB', 'Naboo', 32.7767, -96.7970, now(), now(), false, 'NAB54321', 'Naboo Senate')
      ;" | docker-compose -f docker-compose-postgres.yaml exec -T postgres bash -c "psql -U \$POSTGRES_USER -d postgres"
      

       

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      <Your answer>

      Implementation ideas (optional)

      <Your answer>

            jpechane Jiri Pechanec
            henriquevianaeq@gmail.com Henrique Viana (Inactive)
            Votes:
            1 Vote for this issue
            Watchers:
            6 Start watching this issue

              Created:
              Updated:
              Resolved: