Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6682

Wrong behavior of quote.identifiers in JdbcSinkConnector

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 2.4.0.Beta1
    • None
    • jdbc-connector
    • None

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      debezium-connector-jdbc-2.3.0.Final

      What is the connector configuration?

      source

      {    "name": "vk_nau12",
          "connector.class": "io.debezium.connector.oracle.OracleConnector",
          "database.connection.adapter": "logminer",
          
          "database.hostname" : "...",
          "database.port" : "1521",  
          "database.user" : "debezium",  
          "database.password" : "...",
          "database.dbname": "...",
          "database.connection.adapter": "logminer",
          
          "topic.prefix": "vk_nau12",    
          "snapshot.mode": "initial",
          "lob.enabled": "true",
          
          "log.mining.strategy": "redo_log_catalog",
          "log.mining.session.max.ms": "120000",
          
          "schema.history.internal.kafka.topic": "vk_nau12.schema-changes",
          "schema.history.internal.kafka.bootstrap.servers": "broker1:29092,broker3:29092,broker3:29092",
          "schema.history.internal.store.only.captured.tables.ddl": "true",
          "schema.history.skip.unparseable.ddl": "false",    "topic.creation.enable": "true",    
          "topic.creation.default.partitions": "1",
          "topic.creation.default.include": "vk_nau12\\.*",
          "topic.creation.default.replication.factor": "1",
          "topic.creation.default.compression.type": "lz4",
          "topic.creation.default.retention.ms": "432000000",    "transforms": "lowerCaseTopic",
          "transforms.lowerCaseTopic.type": "ru.rgs.kafka.connect.transforms.LowerCaseTopic",
          
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schema.registry.url": "http://naument-sr:8081",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://naument-sr:8081",
          
          "table.include.list": "DEBEZIUM.VK_NAU12_TBL_SERVICECALL"
      } 

       

      sink

      {
          "name": "vk_nau12_sink",
          "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
          
          "connection.url": "jdbc:postgresql:..",
          "connection.username": "debeziumt",
          "connection.password": "..",
          
          "topics.regex": "vk_nau12.debezium.vk_nau12_tbl_servicecall",
          
          "auto.evolve": "true",
          "auto.create": "true",
          
          "delete.enabled": "true",
          
          "tasks.max": "1",
          
          "insert.mode": "upsert",
          "primary.key.mode": "record_key",
          "quote.identifiers": "false",
          "schema.evolution": "basic",
          "table.name.format": "vk_nau12_tbl_servicecall",
          
          "value.converter": "io.confluent.connect.avro.AvroConverter",    
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schema.registry.url": "http://naument-sr:8081",            
          "key.converter.schema.registry.url": "http://naument-sr:8081"} 

       

       

      What is the captured database version and mode of depoyment?

      (E.g. on-premises, with a specific cloud provider, etc.)

      source db: Oracle Database 12c Enterprise Edition Release 12.2.0.1.0 - 64bit Production

      target db:PostgreSQL 13.2 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 8.3.1 20191121 (Red Hat 8.3.1-5), 64-bit

       

      What behaviour do you expect?

      <Your answer>

      What behaviour do you see?

      Sink connector fails with SQLException: Cannot ALTER table 'vk_nau12_tbl_servicecall' because field 'ID' is not optional but has no default value at

       

      org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:80) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583) ... 11 more Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:72) at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:89) ... 12 more Caused by: java.sql.SQLException: Cannot ALTER table 'vk_nau12_tbl_servicecall' because field 'ID' is not optional but has no default value at io.debezium.connector.jdbc.JdbcChangeEventSink.alterTableIfNeeded(JdbcChangeEventSink.java:181) at io.debezium.connector.jdbc.JdbcChangeEventSink.checkAndApplyTableChangesIfNeeded(JdbcChangeEventSink.java:118) at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:68) ... 13 more 

       

      When I check target database after, I see that sink-connector managed to create table and inserted one row there.

      In config I use "quote.identifiers": "false", because my source database is Oracle, and it has field names in upper-case. My target database is Postgres, and I want to have field names in default lower-case.
      If I put "quote.identifiers": "true", then connector doesn't fail, but it creates target table with field names in upper-case, like expected, and it's not an option for me.

      So my guess is "quote.identifiers": "false" works a little bit wrong.
      It creates table with fieldnames in default lower-case, but after it runs some ALTER TABLE commands to mark fields nullable/unnullable and there it may use quotes for field names. This explains 'ID' in error message: "because field 'ID' is not optional but has no default value at"

      Do you see the same behaviour using the latest relesead Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      I've tried only debezium-connector-jdbc-2.3.0.Final

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      Yes, I can provide them if nesessary.

      How to reproduce the issue using our tutorial deployment?

            rh-ee-mvitale Mario Fiore Vitale
            v_kapishevskaya Valeriia Kapishevskaia (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: