Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7620

JDBC connector does not process ByteBuffer field value

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 2.6.0.CR1
    • 2.6.0.Beta1
    • jdbc-connector
    • None
    • False
    • None
    • False

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      debezium-connector-oracle version 2.6.0.Alpha2

      What is the connector configuration?

      {
        "name": "source-data-types-logminer-connector",
        "config": {
          "connector.class": "io.debezium.connector.oracle.OracleConnector",
          "tasks.max": "1",
          "database.hostname": "oracle",
          "database.port": "1521",
          "database.user": "c##dbzuser",
          "database.password": "dbz",
          "database.dbname": "orclcdb",
          "database.pdb.name": "orclpdb1",
          "database.connection.adapter": "logminer",
          "topic.prefix": "logminer",
          "lob.enabled": "true",
          "schema.name.adjustment.mode": "avro",
          "table.include.list": "C##DBZUSER.SOURCE_DATA_TYPES",
          "include.schema.changes": "false",
          "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
          "schema.history.internal.kafka.topic": "schema-changes.data-types-logminer",
          "heartbeat.interval.ms": "60000",
          "log.mining.strategy": "online_catalog",
          "log.mining.query.filter.mode": "in",
          "custom.metric.tags": "connector=source-data-types-connector",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://schema-registry:8081",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schema.registry.url": "http://schema-registry:8081"
        }
      }
      
      {
        "name": "sink-data-types-logminer-connector",
        "config": {
          "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
          "tasks.max": "1",
          "connection.url": "jdbc:oracle:thin:@oracle:1521/orclpdb1",
          "connection.username": "c##dbzuser",
          "connection.password": "dbz",
          "topics": "logminer.C__DBZUSER.SOURCE_DATA_TYPES",
          "insert.mode": "upsert",
          "delete.enabled": "true",
          "primary.key.mode": "record_key",
          "table.name.format": "c##dbzuser.SINK_DATA_TYPES",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://schema-registry:8081",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schema.registry.url": "http://schema-registry:8081"
        }
      }
      

      What is the captured database version and mode of deployment?

      Oracle Database 19, Docker

      What behaviour do you expect?

      JDBC connector successfully processes record with ByteBuffer field value deserialized by AvroConverter for BYTES schema.

      What behaviour do you see?

      JDBC connector falls during processing record with ByteBuffer field value deserialized by AvroConverter for BYTES schema.

      Do you see the same behaviour using the latest relesead Debezium version?

      Yes

      Do you have the connector logs, ideally from start till finish?

      org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
      at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
      at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
      at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:96)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601) ... 11 more
      Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
      at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:210)
      at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:188)
      at java.base/java.util.HashMap.forEach(HashMap.java:1337)
      at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:188)
      at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:149)
      at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103) ... 12 more
      Caused by: java.lang.RuntimeException: java.sql.SQLException: Invalid column type
      at io.debezium.connector.jdbc.PreparedStatementQueryBinder.bind(PreparedStatementQueryBinder.java:31)
      at java.base/java.lang.Iterable.forEach(Iterable.java:75)
      at io.debezium.connector.jdbc.RecordWriter.bindFieldValuesToQuery(RecordWriter.java:158)
      at io.debezium.connector.jdbc.RecordWriter.bindNonKeyValuesToQuery(RecordWriter.java:141)
      at io.debezium.connector.jdbc.RecordWriter.bindValues(RecordWriter.java:115)
      at io.debezium.connector.jdbc.RecordWriter.lambda$processBatch$0(RecordWriter.java:75)
      at org.hibernate.jdbc.WorkExecutor.executeWork(WorkExecutor.java:37)
      at org.hibernate.internal.AbstractSharedSessionContract.lambda$doWork$4(AbstractSharedSessionContract.java:966)
      at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:303)
      at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:977)
      at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:965)
      at io.debezium.connector.jdbc.RecordWriter.write(RecordWriter.java:51)
      at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:203) ... 17 more
      Caused by: java.sql.SQLException: Invalid column type
      at oracle.jdbc.driver.OraclePreparedStatement.setObjectCritical(OraclePreparedStatement.java:8569)
      at oracle.jdbc.driver.OraclePreparedStatement.setObjectInternal(OraclePreparedStatement.java:8051)
      at oracle.jdbc.driver.OraclePreparedStatement.setObjectInternal(OraclePreparedStatement.java:8826)
      at oracle.jdbc.driver.OraclePreparedStatement.setObject(OraclePreparedStatement.java:8801)
      at oracle.jdbc.driver.OraclePreparedStatementWrapper.setObject(OraclePreparedStatementWrapper.java:218)
      at com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.setObject(NewProxyPreparedStatement.java:821)
      at io.debezium.connector.jdbc.PreparedStatementQueryBinder.bind(PreparedStatementQueryBinder.java:27) ... 29 more
      
      

      Feature request or enhancement

      https://github.com/debezium/debezium-connector-jdbc/pull/66

      How to reproduce the issue using our tutorial deployment?

      1. Create a new source table:

      CREATE TABLE c##dbzuser.source_data_types (
        id                       NUMBER(10) GENERATED BY DEFAULT ON NULL AS IDENTITY (START with 1 INCREMENT by 1) NOT NULL PRIMARY KEY,
        c_varchar2               VARCHAR2(400),
        c_nvarchar2              NVARCHAR2(400),
        c_number                 NUMBER(20),
        c_float                  FLOAT(63),
        c_decimal                DECIMAL(16),
        c_long                   LONG,
        c_binary_float           BINARY_FLOAT,
        c_binary_double          BINARY_DOUBLE,
        c_date                   DATE,
        c_timestamp              TIMESTAMP,
        c_timestamp_wtz          TIMESTAMP WITH TIME ZONE,
        c_timestamp_wltz         TIMESTAMP WITH LOCAL TIME ZONE,
        c_raw                    RAW(1000),
        c_char                   CHAR(100),
        c_nchar                  NCHAR(100),
        c_clob                   CLOB,
        c_nclob                  NCLOB,
        c_blob                   BLOB
      );
      

      2. Create a new sink table:

      CREATE TABLE c##dbzuser.sink_data_types (
        id                       NUMBER(10) NOT NULL PRIMARY KEY,
        c_varchar2               VARCHAR2(400),
        c_nvarchar2              NVARCHAR2(400),
        c_number                 NUMBER(20),
        c_float                  FLOAT(63),
        c_decimal                DECIMAL(16),
        c_long                   LONG,
        c_binary_float           BINARY_FLOAT,
        c_binary_double          BINARY_DOUBLE,
        c_date                   DATE,
        c_timestamp              TIMESTAMP,
        c_timestamp_wtz          TIMESTAMP WITH TIME ZONE,
        c_timestamp_wltz         TIMESTAMP WITH LOCAL TIME ZONE,
        c_raw                    RAW(1000),
        c_char                   CHAR(100),
        c_nchar                  NCHAR(100),
        c_clob                   CLOB,
        c_nclob                  NCLOB,
        c_blob                   BLOB
      );
      

      3. Create a new source connector:

      curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d '
      {
        "name": "source-data-types-logminer-connector",
        "config": {
          "connector.class": "io.debezium.connector.oracle.OracleConnector",
          "tasks.max": "1",
          "database.hostname": "oracle",
          "database.port": "1521",
          "database.user": "c##dbzuser",
          "database.password": "dbz",
          "database.dbname": "orclcdb",
          "database.pdb.name": "orclpdb1",
          "database.connection.adapter": "logminer",
          "topic.prefix": "logminer",
          "lob.enabled": "true",
          "schema.name.adjustment.mode": "avro",
          "table.include.list": "C##DBZUSER.SOURCE_DATA_TYPES",
          "include.schema.changes": "false",
          "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
          "schema.history.internal.kafka.topic": "schema-changes.data-types-logminer",
          "heartbeat.interval.ms": "60000",
          "log.mining.strategy": "online_catalog",
          "log.mining.query.filter.mode": "in",
          "custom.metric.tags": "connector=source-data-types-connector",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://schema-registry:8081",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schema.registry.url": "http://schema-registry:8081"
        }
      }'
      

      4. Create a new sink connector:

      curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d '
      {
        "name": "sink-data-types-logminer-connector",
        "config": {
          "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
          "tasks.max": "1",
          "connection.url": "jdbc:oracle:thin:@oracle:1521/orclpdb1",
          "connection.username": "c##dbzuser",
          "connection.password": "dbz",
          "topics": "logminer.C__DBZUSER.SOURCE_DATA_TYPES",
          "insert.mode": "upsert",
          "delete.enabled": "true",
          "primary.key.mode": "record_key",
          "table.name.format": "c##dbzuser.SINK_DATA_TYPES",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://schema-registry:8081",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schema.registry.url": "http://schema-registry:8081"
        }
      }'
      

      5. Insert a new record into the source table:

      INSERT INTO c##dbzuser.source_data_types (
        c_varchar2,
        c_nvarchar2,
        c_number,
        c_float,
        c_decimal,
        c_long,
        c_binary_float,
        c_binary_double,
        c_date,
        c_timestamp,
        c_timestamp_wtz,
        c_timestamp_wltz,
        c_raw,
        c_char,
        c_nchar,
        c_clob,
        c_nclob,
        c_blob
      )
      VALUES (
        'varchar2',
        'nvarchar2',
        1,
        12.12,
        123.123,
        1234,
        1.234567E+003,
        1.234567E+003,
        TO_DATE('2019-01-30 18:30:52', 'YYYY-MM-DD HH24:MI:SS'),
        TO_TIMESTAMP('2019-01-30 18:30:52.123000','YYYY-MM-DD HH24:MI:SS.FF'),
        TO_TIMESTAMP_TZ('2019-01-30 18:30:52.123000 -08:00','YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'),
        TO_TIMESTAMP('2019-01-30 18:30:52.123000','YYYY-MM-DD HH24:MI:SS.FF'),
        HEXTORAW('7D'),
        'char',
        'nchar',
        TO_CLOB('clob'),
        TO_NCLOB('nclob'),
        TO_BLOB(HEXTORAW('7D'))
      );
      commit;
      

      6. Check the sink connector status.

      ER: Running
      AR: Failed

      Caused by: java.sql.SQLException: Invalid column type
      at oracle.jdbc.driver.OraclePreparedStatement.setObjectCritical(OraclePreparedStatement.java:8569)
      at oracle.jdbc.driver.OraclePreparedStatement.setObjectInternal(OraclePreparedStatement.java:8051)
      at oracle.jdbc.driver.OraclePreparedStatement.setObjectInternal(OraclePreparedStatement.java:8826)
      at oracle.jdbc.driver.OraclePreparedStatement.setObject(OraclePreparedStatement.java:8801)
      at oracle.jdbc.driver.OraclePreparedStatementWrapper.setObject(OraclePreparedStatementWrapper.java:218)
      at com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.setObject(NewProxyPreparedStatement.java:821)
      at io.debezium.connector.jdbc.PreparedStatementQueryBinder.bind(PreparedStatementQueryBinder.java:27)
      

            Unassigned Unassigned
            andrey.pustovetov@gmail.com Andrey Pustovetov
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: