Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7519

Cross-database table creation by DBZ JDBC sink connector fails due to unprocessed default values

XMLWordPrintable

    • False
    • None
    • False

      Bug Report

      What Debezium connector do you use and what version?

      Debezium Oracle source connector 2.6.0-SNAPSHOT (commit f2f1ca6)
      Debezium JDBC sink connector 2.6.0.Alpha2

      What is the connector configuration?

      connector-config.json
      {
          "name": "debezium-jdbc-sink-connector",
          "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
          "tasks.max": "1",
          "connection.password": "***",
          "connection.url": "***",
          "connection.username": "***",
          "delete.enabled": "false",
          "insert.mode": "upsert",
          "primary.key.mode": "record_key",
          "schema.evolution": "basic",
          "topics.regex": "topic_prefix.*",
          "table.name.format": "target_schema.${topic}",
          "transforms": "dropPrefix",
          "transforms.dropPrefix.regex": "topic_prefix.(.*)",
          "transforms.dropPrefix.replacement": "$1",
          "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "***",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schema.registry.url": "***"
      }
      

      What is the captured database version and mode of depoyment?

      Source: Oracle Database 19c
      Target: PostgreSQL 12
      Deployment: On-premise

      What behaviour do you expect?

      Successful table creation with appropriate handling of default timestamp values when replicating schema from Oracle to PostgreSQL, without errors related to default values and timestamp formats.

      What behaviour do you see?

      Failure during table creation due to syntax error in default timestamp values when the JDBC Sink Connector attempts to create tables in PostgreSQL with default values serialized from Oracle, resulting in unhandled timestamp formats and named parameters not bound errors.

      Do you see the same behaviour using the latest released Debezium version?

      Yes

      Do you have the connector logs, ideally from start till finish?

      docker logs kafka-connect-debezium
      [2024-02-19 14:45:49,793] DEBUG [debezium-jdbc-sink-connector|task-0] Attempting to create table 'target_schema.TABLE_A'. (io.debezium.connector.jdbc.JdbcChangeEventSink:276)
      	at java.base/java.lang.Thread.run(Thread.java:829)
      SLF4J: Failed toString() invocation on an object of type [io.debezium.util.Stopwatch$SingleDuration]
      Reported exception:
      java.lang.NullPointerException
      	at io.debezium.util.Stopwatch$BaseDurations.toString(Stopwatch.java:488)
      	at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:277)
      	at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:249)
      	at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:211)
      	at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:161)
      	at org.slf4j.helpers.MessageFormatter.format(MessageFormatter.java:124)
      	at org.slf4j.impl.Reload4jLoggerAdapter.trace(Reload4jLoggerAdapter.java:112)
      	at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:122)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
      	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at java.base/java.lang.Thread.run(Thread.java:829)
      [2024-02-19 14:45:49,793] TRACE [debezium-jdbc-sink-connector|task-0] SQL: CREATE TABLE target_schema.TABLE_A (COL_A double precision NOT NULL, COL_B text NOT NULL, COL_C text NOT NULL, COL_D double precision NULL, COL_E double precision NULL, COL_F double precision NULL, COL_G double precision NULL, COL_F timestamp(6) with time zone NULL, COL_G timestamp(6) with time zone NULL, COL_H timestamp(6) with time zone NULL, COL_I timestamp(6) with time zone NULL, COL_J double precision NULL, COL_K double precision NULL, COL_L text NULL, COL_M double precision NULL, COL_N text NULL, COL_O text NULL, COL_P double precision NULL, COL_Q timestamp(6) with time zone NULL, COL_R text NULL, COL_S timestamp(6) with time zone DEFAULT 'SYSTIMESTAMP(9) at time zone 'UTC'' NULL, COL_T timestamp(6) with time zone DEFAULT 'timestamp '3000-01-01 00:00:00.000 +00:00'' NULL, COL_U double precision NULL, COL_V double precision NULL, COL_W double precision NULL, PRIMARY KEY(COL_A)) (io.debezium.connector.jdbc.JdbcChangeEventSink:286)
      [2024-02-19 14:45:49,794] ERROR [debezium-jdbc-sink-connector|task-0] Failed to process record: Failed to process a sink record (io.debezium.connector.jdbc.JdbcSinkConnectorTask:112)
      org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:210)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:188)
      	at java.base/java.util.HashMap.forEach(HashMap.java:1337)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:188)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:149)
      	at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
      	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.hibernate.QueryException: Named parameter not bound : 00
      	at org.hibernate.query.internal.QueryParameterBindingsImpl.lambda$validate$0(QueryParameterBindingsImpl.java:136)
      	at java.base/java.lang.Iterable.forEach(Iterable.java:75)
      	at org.hibernate.query.internal.ParameterMetadataImpl.visitParameters(ParameterMetadataImpl.java:193)
      	at org.hibernate.query.spi.ParameterMetadataImplementor.visitRegistrations(ParameterMetadataImplementor.java:29)
      	at org.hibernate.query.internal.QueryParameterBindingsImpl.validate(QueryParameterBindingsImpl.java:132)
      	at org.hibernate.query.spi.AbstractSelectionQuery.beforeQuery(AbstractSelectionQuery.java:382)
      	at org.hibernate.query.spi.AbstractQuery.executeUpdate(AbstractQuery.java:640)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.createTable(JdbcChangeEventSink.java:287)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.checkAndApplyTableChangesIfNeeded(JdbcChangeEventSink.java:240)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:199)
      	... 17 more
      [2024-02-19 14:45:49,795] DEBUG [debezium-jdbc-sink-connector|task-0] Rewinding topic TABLE_B offset to 0. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
      [2024-02-19 14:45:49,795] DEBUG [debezium-jdbc-sink-connector|task-0] Rewinding topic TABLE_C offset to 2959187. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
      [2024-02-19 14:45:49,795] DEBUG [debezium-jdbc-sink-connector|task-0] Rewinding topic TABLE_D offset to 2339. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
      [2024-02-19 14:45:49,795] DEBUG [debezium-jdbc-sink-connector|task-0] Rewinding topic TABLE_A offset to 385416. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
      [2024-02-19 14:45:49,795] TRACE [debezium-jdbc-sink-connector|task-0] [PERF] Total put execution time   7.46351s total;   1 samples;  7.46351s avg;  7.46351s min;  7.46351s max (io.debezium.connector.jdbc.JdbcSinkConnectorTask:120)
      [2024-02-19 14:45:49,795] TRACE [debezium-jdbc-sink-connector|task-0] [PERF] Sink execute execution time [FAILED toString()] (io.debezium.connector.jdbc.JdbcSinkConnectorTask:121)
      [2024-02-19 14:45:49,796] TRACE [debezium-jdbc-sink-connector|task-0] [PERF] Mark processed execution time [FAILED toString()] (io.debezium.connector.jdbc.JdbcSinkConnectorTask:122)
      [2024-02-19 14:45:49,821] ERROR [debezium-jdbc-sink-connector|task-0] JDBC sink connector failure (io.debezium.connector.jdbc.JdbcSinkConnectorTask:95)
      org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:210)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:188)
      	at java.base/java.util.HashMap.forEach(HashMap.java:1337)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:188)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:149)
      	at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
      	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.hibernate.QueryException: Named parameter not bound : 00
      	at org.hibernate.query.internal.QueryParameterBindingsImpl.lambda$validate$0(QueryParameterBindingsImpl.java:136)
      	at java.base/java.lang.Iterable.forEach(Iterable.java:75)
      	at org.hibernate.query.internal.ParameterMetadataImpl.visitParameters(ParameterMetadataImpl.java:193)
      	at org.hibernate.query.spi.ParameterMetadataImplementor.visitRegistrations(ParameterMetadataImplementor.java:29)
      	at org.hibernate.query.internal.QueryParameterBindingsImpl.validate(QueryParameterBindingsImpl.java:132)
      	at org.hibernate.query.spi.AbstractSelectionQuery.beforeQuery(AbstractSelectionQuery.java:382)
      	at org.hibernate.query.spi.AbstractQuery.executeUpdate(AbstractQuery.java:640)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.createTable(JdbcChangeEventSink.java:287)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.checkAndApplyTableChangesIfNeeded(JdbcChangeEventSink.java:240)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:199)
      	... 17 more
      [2024-02-19 14:45:49,822] ERROR [debezium-jdbc-sink-connector|task-0] WorkerSinkTask{id=debezium-jdbc-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JDBC sink connector failure (org.apache.kafka.connect.runtime.WorkerSinkTask:616)
      org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
      	at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:96)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
      	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:210)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:188)
      	at java.base/java.util.HashMap.forEach(HashMap.java:1337)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:188)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:149)
      	at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103)
      	... 12 more
      Caused by: org.hibernate.QueryException: Named parameter not bound : 00
      	at org.hibernate.query.internal.QueryParameterBindingsImpl.lambda$validate$0(QueryParameterBindingsImpl.java:136)
      	at java.base/java.lang.Iterable.forEach(Iterable.java:75)
      	at org.hibernate.query.internal.ParameterMetadataImpl.visitParameters(ParameterMetadataImpl.java:193)
      	at org.hibernate.query.spi.ParameterMetadataImplementor.visitRegistrations(ParameterMetadataImplementor.java:29)
      	at org.hibernate.query.internal.QueryParameterBindingsImpl.validate(QueryParameterBindingsImpl.java:132)
      	at org.hibernate.query.spi.AbstractSelectionQuery.beforeQuery(AbstractSelectionQuery.java:382)
      	at org.hibernate.query.spi.AbstractQuery.executeUpdate(AbstractQuery.java:640)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.createTable(JdbcChangeEventSink.java:287)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.checkAndApplyTableChangesIfNeeded(JdbcChangeEventSink.java:240)
      	at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:199)
      	... 17 more
      
      

      How to reproduce the issue using our tutorial deployment?

      1. Set up an Oracle source database with tables that have default timestamp values.
      2. Configure the Debezium JDBC Sink Connector to replicate these tables to a PostgreSQL database.
      3. Observe the failure during the table creation phase in PostgreSQL due to syntax errors related to default timestamp values.

      Implementation ideas

      Default value parsing

      Introduce a configuration option such as `parse.default.values`, which when set to `true`, enables the connector to attempt parsing and converting default values to a format compatible with the target database system. This involves implementing a conversion mechanism that can interpret and transform source database default values into a suitable format for the target database, considering the data types and default value expressions supported by the target.

      For complex cases where automatic parsing might not be feasible or reliable, the connector could log a warning and either skip the default value or use a safe fallback, such as setting the column to nullable without a default.

      Include/exclude list for default values

      Provide configuration options like `default.values.include.list` and `default.values.exclude.list`, allowing users to specify which columns should have their default values included or excluded in the generated DDL statements. This list could support regex patterns for flexibility.

      When processing table creation DDL statements, the connector would use these lists to determine whether to include or omit default values for each column. If a column's default value is to be omitted, the connector would generate the column without a default value, making it nullable if not already so.

            ccranfor@redhat.com Chris Cranford
            naivedigit Theodore Evans
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated: