-
Bug
-
Resolution: Done
-
Major
-
None
-
None
-
None
-
None
-
False
-
None
-
False
In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
2.4.0.Beta1 OracleConnector, JdbcSinkConnector
What is the connector configuration?
source-connector
{ "name": "vk_nau55_src", "connector.class" : "io.debezium.connector.oracle.OracleConnector", "tasks.max" : "1", "database.hostname" : "***", "database.port" : "1521", "database.user" : "debezium", "database.password" : "***", "database.dbname": "NAUMENT1", "database.connection.adapter": "logminer", "schema.history.internal.kafka.topic": "vk_nau55_src.schema-changes", "schema.history.internal.kafka.bootstrap.servers": "broker1:29092,broker3:29092,broker3:29092", "schema.history.internal.store.only.captured.tables.ddl": "true", "schema.history.internal.store.only.captured.databases.ddl": "true", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://naument-sr:8081", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://naument-sr:8081", "errors.log.enable": "true", "snapshot.lock.timeout.ms":"5000", "include.schema.changes": "true", "snapshot.mode":"always", "decimal.handling.mode": "precise", "lob.enabled": "true", "datatype.propagate.source.type": ".*", "log.mining.session.max.ms": "120000", "topic.prefix": "vk_nau55", "topic.creation.enable": "true", "topic.creation.default.partitions": "1", "topic.creation.default.include": "vk_nau55\\.*", "topic.creation.default.replication.factor": "1", "topic.creation.default.compression.type": "lz4", "topic.creation.default.retention.ms": "432000000", "table.include.list" : "DEBEZIUM.GBC_TBL_SERVICECALL_NC55"}
sink-connector
{ "name": "vk_nau55_sink", "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", "connection.url": "jdbc:postgresql://***:5438/db_ods_test?currentSchema=naument1", "connection.username": "debeziumt", "connection.password": "***", "auto.evolve": "true", "auto.create": "true", "tasks.max": "1", "topics.regex": "vk_nau55.DEBEZIUM.GBC_TBL_SERVICECALL_NC55", "table.name.format": "vk_nau55_tbl_servicecall", "insert.mode": "upsert", "delete.enabled": "true", "primary.key.mode": "record_key", "quote.identifiers": "false", "schema.evolution": "basic", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://naument-sr:8081", "key.converter.schema.registry.url": "http://naument-sr:8081"}
What is the captured database version and mode of depoyment?
(E.g. on-premises, with a specific cloud provider, etc.)
source db: Oracle Database 12c Enterprise Edition Release 12.2.0.1.0 - 64bit Production
target db:PostgreSQL 13.2 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 8.3.1 20191121 (Red Hat 8.3.1-5), 64-bit
What behaviour do you expect?
<Your answer>
What behaviour do you see?
I create souce table and fill it with a few rows:
CREATE TABLE DEBEZIUM.GBC_TBL_SERVICECALL_NC55 ( ID NUMBER(19,0) NOT NULL ENABLE, CREATION_DATE TIMESTAMP (6) NOT NULL ENABLE, CLAIM_TRANSFERDATE DATE, TITLE VARCHAR2(4000 CHAR), CLIENT_EMAIL VARCHAR2(255 CHAR), CLAIM_SUMRETURN FLOAT(126), PRIMARY KEY (ID));ALTER TABLE DEBEZIUM.GBC_TBL_SERVICECALL_NC55 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; INSERT INTO DEBEZIUM.GBC_TBL_SERVICECALL_NC55( ID, CREATION_DATE, CLAIM_TRANSFERDATE, TITLE, CLIENT_EMAIL, CLAIM_SUMRETURN)VALUES(1, SYSTIMESTAMP, SYSDATE, 'TITLE 1', 'EMAIL 1', 111.22); INSERT INTO DEBEZIUM.GBC_TBL_SERVICECALL_NC55( ID, CREATION_DATE, CLAIM_TRANSFERDATE, TITLE, CLIENT_EMAIL, CLAIM_SUMRETURN)VALUES(2, SYSTIMESTAMP, SYSDATE, 'TITLE 1', 'EMAIL 1', 111.22);
than I run source and sink connectors, everything works fine.
Than I run a few dml operations on source db:
INSERT INTO DEBEZIUM.GBC_TBL_SERVICECALL_NC55( ID, CREATION_DATE, CLAIM_TRANSFERDATE, TITLE, CLIENT_EMAIL, CLAIM_SUMRETURN) VALUES(3, SYSTIMESTAMP, SYSDATE, 'TITLE 1', 'EMAIL 1', 111.22); --Finish time Wed Sep 13 20:58:53 2023 DELETE FROM DEBEZIUM.GBC_TBL_SERVICECALL_NC55 WHERE id = 2 --Finish time Wed Sep 13 20:59:54 2023 INSERT INTO DEBEZIUM.GBC_TBL_SERVICECALL_NC55( ID, CREATION_DATE, CLAIM_TRANSFERDATE, TITLE, CLIENT_EMAIL, CLAIM_SUMRETURN) VALUES(4, SYSTIMESTAMP, SYSDATE, 'TITLE 1', 'EMAIL 1', 111.22); --Finish time Wed Sep 13 21:00:31 2023
and it looks like after delete I get an error NullPointerException, but I see that in target db delete statement was made and the last insert wasn't.
So it has only 2 rows with id=1,3.
Error:
Hibernate: DELETE FROM naument1.vk_nau55_tbl_servicecall WHERE ID=? 2023-09-13 20:59:55,957 ERROR || Failed to process record: Failed to process a sink record [io.debezium.connector.jdbc.JdbcSinkConnectorTask] org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:82) at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:93) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.NullPointerException at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.isFlattened(SinkRecordDescriptor.java:281) at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:273) at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:70)
Do you see the same behaviour using the latest relesead Debezium version?
(Ideally, also verify with latest Alpha/Beta/CR version)
I haven't tried beta2 yet.
it looks like I've already tested delete operations with these connectors and it worked fine before Beta1.
Do you have the connector logs, ideally from start till finish?
(You might be asked later to provide DEBUG/TRACE level log)
How to reproduce the issue using our tutorial deployment?
<Your answer>
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
Which use case/requirement will be addressed by the proposed feature?
<Your answer>
Implementation ideas (optional)
<Your answer>