Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6904

NPE in sink-connector after delete-operation

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • None
    • None
    • None
    • None
    • False
    • None
    • False

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      2.4.0.Beta1 OracleConnector, JdbcSinkConnector

      What is the connector configuration?

      source-connector

       

      {	"name": "vk_nau55_src",	"connector.class" : "io.debezium.connector.oracle.OracleConnector",	"tasks.max" : "1",	"database.hostname" : "***",	"database.port" : "1521",  	"database.user" : "debezium",  	"database.password" : "***",	"database.dbname": "NAUMENT1",	"database.connection.adapter": "logminer",		"schema.history.internal.kafka.topic": "vk_nau55_src.schema-changes",	"schema.history.internal.kafka.bootstrap.servers": "broker1:29092,broker3:29092,broker3:29092",	"schema.history.internal.store.only.captured.tables.ddl": "true",	"schema.history.internal.store.only.captured.databases.ddl": "true",		"value.converter": "io.confluent.connect.avro.AvroConverter",	"value.converter.schema.registry.url": "http://naument-sr:8081",	"key.converter": "io.confluent.connect.avro.AvroConverter",	"key.converter.schema.registry.url": "http://naument-sr:8081",		"errors.log.enable": "true",	"snapshot.lock.timeout.ms":"5000",	"include.schema.changes": "true",	"snapshot.mode":"always",	"decimal.handling.mode": "precise",	"lob.enabled": "true",	"datatype.propagate.source.type": ".*",	"log.mining.session.max.ms": "120000",
      	"topic.prefix""vk_nau55",	"topic.creation.enable": "true",		"topic.creation.default.partitions": "1",	"topic.creation.default.include": "vk_nau55\\.*",	"topic.creation.default.replication.factor": "1",	"topic.creation.default.compression.type": "lz4",	"topic.creation.default.retention.ms": "432000000",
      		"table.include.list" : "DEBEZIUM.GBC_TBL_SERVICECALL_NC55"} 

       

       

      sink-connector

       

      {	"name": "vk_nau55_sink",		"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",		"connection.url": "jdbc:postgresql://***:5438/db_ods_test?currentSchema=naument1",	"connection.username": "debeziumt",	"connection.password": "***",
      	"auto.evolve": "true",	"auto.create": "true",	"tasks.max": "1",	"topics.regex": "vk_nau55.DEBEZIUM.GBC_TBL_SERVICECALL_NC55",	"table.name.format": "vk_nau55_tbl_servicecall",	"insert.mode": "upsert",	"delete.enabled": "true",	"primary.key.mode": "record_key",	"quote.identifiers""false",	"schema.evolution": "basic""value.converter": "io.confluent.connect.avro.AvroConverter",		"key.converter": "io.confluent.connect.avro.AvroConverter",	"value.converter.schema.registry.url": "http://naument-sr:8081",				"key.converter.schema.registry.url": "http://naument-sr:8081"} 

       

       

      What is the captured database version and mode of depoyment?

      (E.g. on-premises, with a specific cloud provider, etc.)

      source db: Oracle Database 12c Enterprise Edition Release 12.2.0.1.0 - 64bit Production

      target db:PostgreSQL 13.2 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 8.3.1 20191121 (Red Hat 8.3.1-5), 64-bit

      What behaviour do you expect?

      <Your answer>

      What behaviour do you see?

      I create souce table and fill it with a few rows:

       

      CREATE TABLE DEBEZIUM.GBC_TBL_SERVICECALL_NC55    (	ID NUMBER(19,0) NOT NULL ENABLE, 	CREATION_DATE TIMESTAMP (6) NOT NULL ENABLE, 	CLAIM_TRANSFERDATE DATE, 	TITLE VARCHAR2(4000 CHAR), 	CLIENT_EMAIL VARCHAR2(255 CHAR), 	CLAIM_SUMRETURN FLOAT(126),	 PRIMARY KEY (ID));ALTER TABLE DEBEZIUM.GBC_TBL_SERVICECALL_NC55 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
      INSERT INTO DEBEZIUM.GBC_TBL_SERVICECALL_NC55(  ID, CREATION_DATE, CLAIM_TRANSFERDATE,   TITLE, CLIENT_EMAIL, CLAIM_SUMRETURN)VALUES(1, SYSTIMESTAMP, SYSDATE, 'TITLE 1', 'EMAIL 1', 111.22);
      
      INSERT INTO DEBEZIUM.GBC_TBL_SERVICECALL_NC55(  ID, CREATION_DATE, CLAIM_TRANSFERDATE,   TITLE, CLIENT_EMAIL, CLAIM_SUMRETURN)VALUES(2, SYSTIMESTAMP, SYSDATE, 'TITLE 1', 'EMAIL 1', 111.22); 

      than I run source and sink connectors, everything works fine.

       

      Than I run a few dml operations on source db:

       

      INSERT INTO DEBEZIUM.GBC_TBL_SERVICECALL_NC55(
        ID, CREATION_DATE, CLAIM_TRANSFERDATE, 
        TITLE, CLIENT_EMAIL, CLAIM_SUMRETURN)
      VALUES(3, SYSTIMESTAMP, SYSDATE, 'TITLE 1', 'EMAIL 1', 111.22);
      --Finish time    Wed Sep 13 20:58:53 2023
      
      DELETE FROM DEBEZIUM.GBC_TBL_SERVICECALL_NC55 WHERE id = 2
      --Finish time    Wed Sep 13 20:59:54 2023
      
      INSERT INTO DEBEZIUM.GBC_TBL_SERVICECALL_NC55(
        ID, CREATION_DATE, CLAIM_TRANSFERDATE, 
        TITLE, CLIENT_EMAIL, CLAIM_SUMRETURN)
      VALUES(4, SYSTIMESTAMP, SYSDATE, 'TITLE 1', 'EMAIL 1', 111.22);
      --Finish time    Wed Sep 13 21:00:31 2023 

      and it looks like after delete I get an error NullPointerException, but I see that in target db delete statement was made and the last insert wasn't.

      So it has only 2 rows with id=1,3.

      Error:

      Hibernate: DELETE FROM naument1.vk_nau55_tbl_servicecall WHERE ID=?
      2023-09-13 20:59:55,957 ERROR  ||  Failed to process record: Failed to process a sink record   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
          at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:82)
          at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:93)
          at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
          at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
          at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
          at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
          at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
          at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
          at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
          at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
          at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
          at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
          at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
          at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: java.lang.NullPointerException
          at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.isFlattened(SinkRecordDescriptor.java:281)
          at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:273)
          at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:70) 

       

       

      Do you see the same behaviour using the latest relesead Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version) 

      I haven't tried beta2 yet.

      it looks like I've already tested delete operations with these connectors and it worked fine before Beta1.

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      nau55.txt

      How to reproduce the issue using our tutorial deployment?

      <Your answer>

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      <Your answer>

      Implementation ideas (optional)

      <Your answer>

            Unassigned Unassigned
            v_kapishevskaya Valeriia Kapishevskaia (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: