-
Bug
-
Resolution: Done
-
Major
-
2.6.0.Beta1
-
None
-
False
-
None
-
False
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
debezium-connector-oracle version 2.6.0.Alpha2
What is the connector configuration?
{ "name": "source-data-types-logminer-connector", "config": { "connector.class": "io.debezium.connector.oracle.OracleConnector", "tasks.max": "1", "database.hostname": "oracle", "database.port": "1521", "database.user": "c##dbzuser", "database.password": "dbz", "database.dbname": "orclcdb", "database.pdb.name": "orclpdb1", "database.connection.adapter": "logminer", "topic.prefix": "logminer", "lob.enabled": "true", "schema.name.adjustment.mode": "avro", "table.include.list": "C##DBZUSER.SOURCE_DATA_TYPES", "include.schema.changes": "false", "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.data-types-logminer", "heartbeat.interval.ms": "60000", "log.mining.strategy": "online_catalog", "log.mining.query.filter.mode": "in", "custom.metric.tags": "connector=source-data-types-connector", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081" } }
{ "name": "sink-data-types-logminer-connector", "config": { "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", "tasks.max": "1", "connection.url": "jdbc:oracle:thin:@oracle:1521/orclpdb1", "connection.username": "c##dbzuser", "connection.password": "dbz", "topics": "logminer.C__DBZUSER.SOURCE_DATA_TYPES", "insert.mode": "upsert", "delete.enabled": "true", "primary.key.mode": "record_key", "table.name.format": "c##dbzuser.SINK_DATA_TYPES", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081" } }
What is the captured database version and mode of deployment?
Oracle Database 19, Docker
What behaviour do you expect?
JDBC connector successfully processes record with ByteBuffer field value deserialized by AvroConverter for BYTES schema.
What behaviour do you see?
JDBC connector falls during processing record with ByteBuffer field value deserialized by AvroConverter for BYTES schema.
Do you see the same behaviour using the latest relesead Debezium version?
Yes
Do you have the connector logs, ideally from start till finish?
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:96) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601) ... 11 more Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:210) at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:188) at java.base/java.util.HashMap.forEach(HashMap.java:1337) at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:188) at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:149) at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103) ... 12 more Caused by: java.lang.RuntimeException: java.sql.SQLException: Invalid column type at io.debezium.connector.jdbc.PreparedStatementQueryBinder.bind(PreparedStatementQueryBinder.java:31) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at io.debezium.connector.jdbc.RecordWriter.bindFieldValuesToQuery(RecordWriter.java:158) at io.debezium.connector.jdbc.RecordWriter.bindNonKeyValuesToQuery(RecordWriter.java:141) at io.debezium.connector.jdbc.RecordWriter.bindValues(RecordWriter.java:115) at io.debezium.connector.jdbc.RecordWriter.lambda$processBatch$0(RecordWriter.java:75) at org.hibernate.jdbc.WorkExecutor.executeWork(WorkExecutor.java:37) at org.hibernate.internal.AbstractSharedSessionContract.lambda$doWork$4(AbstractSharedSessionContract.java:966) at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:303) at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:977) at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:965) at io.debezium.connector.jdbc.RecordWriter.write(RecordWriter.java:51) at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:203) ... 17 more Caused by: java.sql.SQLException: Invalid column type at oracle.jdbc.driver.OraclePreparedStatement.setObjectCritical(OraclePreparedStatement.java:8569) at oracle.jdbc.driver.OraclePreparedStatement.setObjectInternal(OraclePreparedStatement.java:8051) at oracle.jdbc.driver.OraclePreparedStatement.setObjectInternal(OraclePreparedStatement.java:8826) at oracle.jdbc.driver.OraclePreparedStatement.setObject(OraclePreparedStatement.java:8801) at oracle.jdbc.driver.OraclePreparedStatementWrapper.setObject(OraclePreparedStatementWrapper.java:218) at com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.setObject(NewProxyPreparedStatement.java:821) at io.debezium.connector.jdbc.PreparedStatementQueryBinder.bind(PreparedStatementQueryBinder.java:27) ... 29 more
Feature request or enhancement
https://github.com/debezium/debezium-connector-jdbc/pull/66
How to reproduce the issue using our tutorial deployment?
1. Create a new source table:
CREATE TABLE c##dbzuser.source_data_types ( id NUMBER(10) GENERATED BY DEFAULT ON NULL AS IDENTITY (START with 1 INCREMENT by 1) NOT NULL PRIMARY KEY, c_varchar2 VARCHAR2(400), c_nvarchar2 NVARCHAR2(400), c_number NUMBER(20), c_float FLOAT(63), c_decimal DECIMAL(16), c_long LONG, c_binary_float BINARY_FLOAT, c_binary_double BINARY_DOUBLE, c_date DATE, c_timestamp TIMESTAMP, c_timestamp_wtz TIMESTAMP WITH TIME ZONE, c_timestamp_wltz TIMESTAMP WITH LOCAL TIME ZONE, c_raw RAW(1000), c_char CHAR(100), c_nchar NCHAR(100), c_clob CLOB, c_nclob NCLOB, c_blob BLOB );
2. Create a new sink table:
CREATE TABLE c##dbzuser.sink_data_types ( id NUMBER(10) NOT NULL PRIMARY KEY, c_varchar2 VARCHAR2(400), c_nvarchar2 NVARCHAR2(400), c_number NUMBER(20), c_float FLOAT(63), c_decimal DECIMAL(16), c_long LONG, c_binary_float BINARY_FLOAT, c_binary_double BINARY_DOUBLE, c_date DATE, c_timestamp TIMESTAMP, c_timestamp_wtz TIMESTAMP WITH TIME ZONE, c_timestamp_wltz TIMESTAMP WITH LOCAL TIME ZONE, c_raw RAW(1000), c_char CHAR(100), c_nchar NCHAR(100), c_clob CLOB, c_nclob NCLOB, c_blob BLOB );
3. Create a new source connector:
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d ' { "name": "source-data-types-logminer-connector", "config": { "connector.class": "io.debezium.connector.oracle.OracleConnector", "tasks.max": "1", "database.hostname": "oracle", "database.port": "1521", "database.user": "c##dbzuser", "database.password": "dbz", "database.dbname": "orclcdb", "database.pdb.name": "orclpdb1", "database.connection.adapter": "logminer", "topic.prefix": "logminer", "lob.enabled": "true", "schema.name.adjustment.mode": "avro", "table.include.list": "C##DBZUSER.SOURCE_DATA_TYPES", "include.schema.changes": "false", "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.data-types-logminer", "heartbeat.interval.ms": "60000", "log.mining.strategy": "online_catalog", "log.mining.query.filter.mode": "in", "custom.metric.tags": "connector=source-data-types-connector", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081" } }'
4. Create a new sink connector:
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors -d ' { "name": "sink-data-types-logminer-connector", "config": { "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", "tasks.max": "1", "connection.url": "jdbc:oracle:thin:@oracle:1521/orclpdb1", "connection.username": "c##dbzuser", "connection.password": "dbz", "topics": "logminer.C__DBZUSER.SOURCE_DATA_TYPES", "insert.mode": "upsert", "delete.enabled": "true", "primary.key.mode": "record_key", "table.name.format": "c##dbzuser.SINK_DATA_TYPES", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081" } }'
5. Insert a new record into the source table:
INSERT INTO c##dbzuser.source_data_types ( c_varchar2, c_nvarchar2, c_number, c_float, c_decimal, c_long, c_binary_float, c_binary_double, c_date, c_timestamp, c_timestamp_wtz, c_timestamp_wltz, c_raw, c_char, c_nchar, c_clob, c_nclob, c_blob ) VALUES ( 'varchar2', 'nvarchar2', 1, 12.12, 123.123, 1234, 1.234567E+003, 1.234567E+003, TO_DATE('2019-01-30 18:30:52', 'YYYY-MM-DD HH24:MI:SS'), TO_TIMESTAMP('2019-01-30 18:30:52.123000','YYYY-MM-DD HH24:MI:SS.FF'), TO_TIMESTAMP_TZ('2019-01-30 18:30:52.123000 -08:00','YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'), TO_TIMESTAMP('2019-01-30 18:30:52.123000','YYYY-MM-DD HH24:MI:SS.FF'), HEXTORAW('7D'), 'char', 'nchar', TO_CLOB('clob'), TO_NCLOB('nclob'), TO_BLOB(HEXTORAW('7D')) ); commit;
6. Check the sink connector status.
ER: Running
AR: Failed
Caused by: java.sql.SQLException: Invalid column type at oracle.jdbc.driver.OraclePreparedStatement.setObjectCritical(OraclePreparedStatement.java:8569) at oracle.jdbc.driver.OraclePreparedStatement.setObjectInternal(OraclePreparedStatement.java:8051) at oracle.jdbc.driver.OraclePreparedStatement.setObjectInternal(OraclePreparedStatement.java:8826) at oracle.jdbc.driver.OraclePreparedStatement.setObject(OraclePreparedStatement.java:8801) at oracle.jdbc.driver.OraclePreparedStatementWrapper.setObject(OraclePreparedStatementWrapper.java:218) at com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.setObject(NewProxyPreparedStatement.java:821) at io.debezium.connector.jdbc.PreparedStatementQueryBinder.bind(PreparedStatementQueryBinder.java:27)
- links to
-
RHEA-2024:139598 Red Hat build of Debezium 2.5.4 release