Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8287

JDBC sink connector doesn't delete rows from a postgres db table

XMLWordPrintable

    • False
    • None
    • False

      See this topic for additional context:

      https://debezium.zulipchat.com/#narrow/stream/383534-community-jdbc/topic/Rows.20don't.20get.20deleted.20in.20this.20case

      I have this configuration:

      Postgres Source table -> Debezium postgres source connector -> Kafka topic using Avro schema -> Debezium JDBC sink connector -> Postgres target table

      Any inserts / updates / deletes done on the source table need to be reflected on the target table. Inserts and updates work as expected. However, deletes performed on the source table aren't reflected on the target table when ExtractNewRecordState is used on the source connector config.

      Here is the source connector config:

       

      {
        "name": "source-postgres-connector",
        "config": {
          "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
          "tasks.max": "1",
          "database.hostname": "postgres",
          "database.port": "5432",
          "database.user": "sa",
          "database.password": "****",
          "database.dbname": "postgres",
          "schema.include.list": "public",
          "table.include.list": "public.devicetype",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schemas.enable": "true",
          "key.converter.schema.registry.url": "http://host.docker.internal:9081",
          "key.converter.auto.register.schemas": false,
          "key.converter.use.latest.version": true,
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schemas.enable": "true",
          "value.converter.schema.registry.url": "http://host.docker.internal:9081",
          "value.converter.auto.register.schemas": false,
          "value.converter.use.latest.version": true,
          "topic.prefix": "db",
          "transforms": "Unwrap,Reroute",
          "transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
          "transforms.Unwrap.delete.tombstone.handling.mode": "rewrite-with-tombstone",
          "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
          "transforms.Reroute.topic.regex": "([^\\.]+)\\.[^\\.]+\\.([^\\.]+)$",
          "transforms.Reroute.topic.replacement": "$1_$2"
        }
      }

       

       

      Here is the JDBC sink connector config:

       

      {
          "name": "jdbc-sink-connector",
          "config": {
              "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
              "key.converter": "io.confluent.connect.avro.AvroConverter",
              "key.converter.schemas.enable": "true",
              "key.converter.schema.registry.url": "http://host.docker.internal:9081",
              "key.converter.auto.register.schemas": false,
              "key.converter.use.latest.version": true,
              "value.converter": "io.confluent.connect.avro.AvroConverter",
              "value.converter.schemas.enable": "true",
              "value.converter.schema.registry.url": "http://host.docker.internal:9081",
              "value.converter.auto.register.schemas": false,
              "value.converter.use.latest.version": true,
              "topics": "db_devicetype",
              "tasks.max": "1",
              "connection.url": "jdbc:postgresql://postgres/postgres",
              "connection.username": "sa",
              "connection.password": "****",
              "insert.mode": "upsert",
              "delete.enabled": true,
              "primary.key.mode": "record_key",
              "primary.key.fields": "id",
              "schema.evolution": "none",
              "table.name.format": "read_only_device_type",
              "field.exclude.list": "db_devicetype:__deleted",
          }
      }

       

       

      I use this Dockerfile to create kafka connect image that supports JDBC sink connector:
       

      FROM confluentinc/cp-kafka-connect-base:latest
      ENV CONNECT_PLUGIN_PATH="${CONNECT_PLUGIN_PATH},/usr/share/debezium-connector-jdbc" 
      RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:latest 
      USER root 
      RUN wget "https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/2.7.0.Final/debezium-connector-jdbc-2.7.0.Final-plugin.tar.gz" 
      RUN mkdir /usr/share/debezium-connector-jdbc RUN tar -xzf debezium-connector-jdbc-2.7.0.Final-plugin.tar.gz -C /usr/share/debezium-connector-jdbc
      RUN rm debezium-connector-jdbc-2.7.0.Final-plugin.tar.gz

       
      This is the docker compose yml file to launch various services:
       
      version: '3.8'

      services:
        postgres:
          image: debezium/example-postgres
          container_name: postgres
          environment:
            POSTGRES_USER: sa
            POSTGRES_PASSWORD: root
          ports:
            - "5432:5432"
          volumes:
            - postgres_data:/var/lib/postgresql/data

        zookeeper:
          image: confluentinc/cp-zookeeper:latest
          container_name: zookeeper
          environment:
            ZOOKEEPER_CLIENT_PORT: 2181
            ZOOKEEPER_TICK_TIME: 2000
          ports:
            - "2181:2181"

        broker:
          image: confluentinc/confluent-local:7.6.0
          container_name: broker
          ports:
            - 29092:29092
          environment:
            KAFKA_LISTENERS: internal_listener://broker:9092,external_listener://broker:29092,CONTROLLER://broker:9093
            KAFKA_ADVERTISED_LISTENERS: internal_listener://broker:9092,external_listener://localhost:29092
            KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: internal_listener:PLAINTEXT,external_listener:PLAINTEXT,CONTROLLER:PLAINTEXT
            KAFKA_INTER_BROKER_LISTENER_NAME: internal_listener
            KAFKA_CONTROLLER_QUORUM_VOTERS: "1@broker:9093"
            KAFKA_NUM_PARTITIONS: 6

        kafka-connect:
          build:
            context: .
            dockerfile: Dockerfile
          hostname: kafka-connect
          container_name: kafka-connect
          depends_on:
            - broker
            - zookeeper
            - schema-registry
          ports:
            - "8083:8083"
          environment:
            CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
            CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
            CONNECT_REST_PORT: 8083
            CONNECT_GROUP_ID: kafka-connect-cluster-group
            CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
            CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
            CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
            CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
            CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
            CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
            CONNECT_CONNECTIONS_MAX_IDLE_MS: 180000
            CONNECT_METADATA_MAX_AGE_MS: 180000
            CONNECT_AUTO_CREATE_TOPICS_ENABLE: "true"
            CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
            CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"

        schema-registry:
          image: confluentinc/cp-schema-registry
          container_name: schema-registry
          depends_on:
            - broker
          ports:
            - "9081:8081"
          environment:
            SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
            SCHEMA_REGISTRY_HOST_NAME: 'schema-registry'
            SCHEMA_REGISTRY_DEBUG: true

      volumes:
        postgres_data:

      What Debezium connector do you use and what version?

      Debezium JDBC sink connector 2.7.0.Final

      Debezium Postgres connector latest version (see Dockerfile)

      What is the connector configuration?

      Included above.

      What is the captured database version and mode of deployment?

      See the docker compose file's contents I posted.

      What behavior do you expect?

      Postgres source connector writes messages to kafka topic with value=null or adds in __deleted=true when a row is removed from the source connector. The JDBC sink connector should be able to process these messages and issue DELETE statements on the target db table.

      What behavior do you see?

      Deletion of a row on the source table doesn't result in deletion of the corresponding row on the target table.

      Do you see the same behaviour using the latest released Debezium version?

      Yes

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      The source connector writes a kafka message with value=NULL on DELETE operation:

      Source connector log:

      kafka-connect    | [2024-10-01 18:50:31,430] INFO 5 records sent during previous 00:01:36.31, last recorded offset of {server=db} partition is {transaction_id=null, lsn_proc=38674688, messageType=DELETE, lsn_commit=38674632, lsn=38674688, txId=879, ts_usec=1727808631354266} (io.debezium.connector.common.BaseSourceTask)

      How to reproduce the issue using our tutorial deployment?

      • Run the docker compose cluster using: docker compose up
      • Use this query to create source table:
        CREATE TABLE IF NOT EXISTS userop_devicetype
        (
            id SERIAL PRIMARY KEY,
            name                             text                            NULL,
            third_party_code                 text                            NULL
        );
      • Use this query to create target table:
        CREATE TABLE IF NOT EXISTS read_only_device_type
        (
        id                               int                         NOT NULL,

               name                             text                            NULL,
               third_party_code                 text                            NULL,
               CONSTRAINT read_only_device_type_pkey PRIMARY KEY (id)
               );

      • Insert a few rows into the source table
      • Register these Avro schemas against the schema registry for the key and value:
      [
          {
              "subject": "db_userop_devicetype-key",
              "version": 1,
              "schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"db_userop_devicetype\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}],\"connect.name\":\"db_userop_devicetype.Key\"}"
          },
          {
              "subject": "db_userop_devicetype-value",
              "version": 1,
              "schema": "{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"db_userop_devicetype\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"third_party_code\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"db_userop_devicetype.Value\"}"
          }
      ]
      • Register source connector with this command:
        curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @postgres-source-connector.json
      • Now you should see the data in the kafka topic. Next, register the sink connector:
        curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @debezium-jdbc-sink-connector.json
      • Now you should the data on the target database
      • Delete a row on the source db table. Verify that a message is written to the kafka topic. However, the corresponding row won't be deleted on the target db table.

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      Fixing this issue would allow us to accurately reflect delete operations on a target database. Note that deletes work when I don't use ExtractNewRecordState; but I need to use it to map the source table schema to my kafka topic's avro schema. 

      Implementation ideas (optional)

      <Your answer>

              Unassigned Unassigned
              sneheet.mishra Sneheet Mishra (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated: