-
Bug
-
Resolution: Unresolved
-
Major
-
2.7.3.Final, 3.0.0.Final
-
None
-
False
-
None
-
False
See this topic for additional context:
I have this configuration:
Postgres Source table -> Debezium postgres source connector -> Kafka topic using Avro schema -> Debezium JDBC sink connector -> Postgres target table
Any inserts / updates / deletes done on the source table need to be reflected on the target table. Inserts and updates work as expected. However, deletes performed on the source table aren't reflected on the target table when ExtractNewRecordState is used on the source connector config.
Here is the source connector config:
{ "name": "source-postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "sa", "database.password": "****", "database.dbname": "postgres", "schema.include.list": "public", "table.include.list": "public.devicetype", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schemas.enable": "true", "key.converter.schema.registry.url": "http://host.docker.internal:9081", "key.converter.auto.register.schemas": false, "key.converter.use.latest.version": true, "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schemas.enable": "true", "value.converter.schema.registry.url": "http://host.docker.internal:9081", "value.converter.auto.register.schemas": false, "value.converter.use.latest.version": true, "topic.prefix": "db", "transforms": "Unwrap,Reroute", "transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.Unwrap.delete.tombstone.handling.mode": "rewrite-with-tombstone", "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Reroute.topic.regex": "([^\\.]+)\\.[^\\.]+\\.([^\\.]+)$", "transforms.Reroute.topic.replacement": "$1_$2" } }
Here is the JDBC sink connector config:
{ "name": "jdbc-sink-connector", "config": { "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schemas.enable": "true", "key.converter.schema.registry.url": "http://host.docker.internal:9081", "key.converter.auto.register.schemas": false, "key.converter.use.latest.version": true, "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schemas.enable": "true", "value.converter.schema.registry.url": "http://host.docker.internal:9081", "value.converter.auto.register.schemas": false, "value.converter.use.latest.version": true, "topics": "db_devicetype", "tasks.max": "1", "connection.url": "jdbc:postgresql://postgres/postgres", "connection.username": "sa", "connection.password": "****", "insert.mode": "upsert", "delete.enabled": true, "primary.key.mode": "record_key", "primary.key.fields": "id", "schema.evolution": "none", "table.name.format": "read_only_device_type", "field.exclude.list": "db_devicetype:__deleted", } }
I use this Dockerfile to create kafka connect image that supports JDBC sink connector:
FROM confluentinc/cp-kafka-connect-base:latest ENV CONNECT_PLUGIN_PATH="${CONNECT_PLUGIN_PATH},/usr/share/debezium-connector-jdbc" RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:latest USER root RUN wget "https://repo1.maven.org/maven2/io/debezium/debezium-connector-jdbc/2.7.0.Final/debezium-connector-jdbc-2.7.0.Final-plugin.tar.gz" RUN mkdir /usr/share/debezium-connector-jdbc RUN tar -xzf debezium-connector-jdbc-2.7.0.Final-plugin.tar.gz -C /usr/share/debezium-connector-jdbc RUN rm debezium-connector-jdbc-2.7.0.Final-plugin.tar.gz
This is the docker compose yml file to launch various services:
version: '3.8'
services:
postgres:
image: debezium/example-postgres
container_name: postgres
environment:
POSTGRES_USER: sa
POSTGRES_PASSWORD: root
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
broker:
image: confluentinc/confluent-local:7.6.0
container_name: broker
ports:
- 29092:29092
environment:
KAFKA_LISTENERS: internal_listener://broker:9092,external_listener://broker:29092,CONTROLLER://broker:9093
KAFKA_ADVERTISED_LISTENERS: internal_listener://broker:9092,external_listener://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: internal_listener:PLAINTEXT,external_listener:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: internal_listener
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@broker:9093"
KAFKA_NUM_PARTITIONS: 6
kafka-connect:
build:
context: .
dockerfile: Dockerfile
hostname: kafka-connect
container_name: kafka-connect
depends_on:
- broker
- zookeeper
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-cluster-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONNECT_CONNECTIONS_MAX_IDLE_MS: 180000
CONNECT_METADATA_MAX_AGE_MS: 180000
CONNECT_AUTO_CREATE_TOPICS_ENABLE: "true"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
schema-registry:
image: confluentinc/cp-schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "9081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
SCHEMA_REGISTRY_HOST_NAME: 'schema-registry'
SCHEMA_REGISTRY_DEBUG: true
volumes:
postgres_data:
What Debezium connector do you use and what version?
Debezium JDBC sink connector 2.7.0.Final
Debezium Postgres connector latest version (see Dockerfile)
What is the connector configuration?
Included above.
What is the captured database version and mode of deployment?
See the docker compose file's contents I posted.
What behavior do you expect?
Postgres source connector writes messages to kafka topic with value=null or adds in __deleted=true when a row is removed from the source connector. The JDBC sink connector should be able to process these messages and issue DELETE statements on the target db table.
What behavior do you see?
Deletion of a row on the source table doesn't result in deletion of the corresponding row on the target table.
Do you see the same behaviour using the latest released Debezium version?
Yes
Do you have the connector logs, ideally from start till finish?
(You might be asked later to provide DEBUG/TRACE level log)
The source connector writes a kafka message with value=NULL on DELETE operation:
Source connector log:
kafka-connect | [2024-10-01 18:50:31,430] INFO 5 records sent during previous 00:01:36.31, last recorded offset of {server=db} partition is {transaction_id=null, lsn_proc=38674688, messageType=DELETE, lsn_commit=38674632, lsn=38674688, txId=879, ts_usec=1727808631354266} (io.debezium.connector.common.BaseSourceTask)
How to reproduce the issue using our tutorial deployment?
- Run the docker compose cluster using: docker compose up
- Use this query to create source table:
CREATE TABLE IF NOT EXISTS userop_devicetype
(
id SERIAL PRIMARY KEY,
name text NULL,
third_party_code text NULL
); - Use this query to create target table:
CREATE TABLE IF NOT EXISTS read_only_device_type
(
id int NOT NULL,
name text NULL,
third_party_code text NULL,
CONSTRAINT read_only_device_type_pkey PRIMARY KEY (id)
);
- Insert a few rows into the source table
- Register these Avro schemas against the schema registry for the key and value:
[ { "subject": "db_userop_devicetype-key", "version": 1, "schema": "{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"db_userop_devicetype\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}],\"connect.name\":\"db_userop_devicetype.Key\"}" }, { "subject": "db_userop_devicetype-value", "version": 1, "schema": "{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"db_userop_devicetype\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"third_party_code\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"db_userop_devicetype.Value\"}" } ]
- Register source connector with this command:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @postgres-source-connector.json - Now you should see the data in the kafka topic. Next, register the sink connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @debezium-jdbc-sink-connector.json - Now you should the data on the target database
- Delete a row on the source db table. Verify that a message is written to the kafka topic. However, the corresponding row won't be deleted on the target db table.
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
Which use case/requirement will be addressed by the proposed feature?
Fixing this issue would allow us to accurately reflect delete operations on a target database. Note that deletes work when I don't use ExtractNewRecordState; but I need to use it to map the source table schema to my kafka topic's avro schema.
Implementation ideas (optional)
<Your answer>