-
Bug
-
Resolution: Done
-
Major
-
2.7.1.Final
-
None
Bug report
What Debezium connector do you use and what version?
Oracle 2.7.1.Final (EmbeddedEngine)
What is the connector configuration?
connector.class = io.debezium.connector.oracle.OracleConnector database.connection.adapter = logminer database.dbname = ***** database.pdb.name = ***** database.schema = ***** database.tablename.case.insensitive = false database.url = ***** database.user = ***** datatype.propagate.source.type = .* decimal.handling.mode = double errors.max.retries = 0 heartbeat.action.query = ***** include.schema.changes = false lob.enabled = false log.mining.strategy = hybrid max.batch.size = 1000 max.queue.size = 20000 name = sample_oracle offset.flush.interval.ms = 120000 offset.flush.timeout.ms = 30000 offset.storage = org.apache.kafka.connect.storage.MemoryOffsetBackingStore poll.interval.ms = 1000 retriable.restart.connector.wait.ms = 60000 schema.history.internal = io.debezium.relational.history.MemorySchemaHistory schema.history.internal.store.only.captured.tables.ddl = true schemas.enable = true snapshot.fetch.size = 1000 snapshot.lock.timeout.ms = 10000 snapshot.mode = no_data table.include.list = ***** tasks.max = 1 time.precision.mode = adaptive tombstones.on.delete = false topic.prefix = oracle_test transforms = unwrap,timestamp_converter,backendtype,metastore_server_name,source_name,log_seq_no,schema_name_override,table_name_override,db_name_override,trg_schema_name,trg_table_name transforms.backendtype.static.field = __backend_type transforms.backendtype.static.value = NONE transforms.backendtype.type = org.apache.kafka.connect.transforms.InsertField$Value transforms.db_name_override.static.field = __cdc_msg_override_db transforms.db_name_override.static.value = NONE transforms.db_name_override.type = org.apache.kafka.connect.transforms.InsertField$Value transforms.log_seq_no.static.field = __log_seq_no transforms.log_seq_no.static.value = NONE transforms.log_seq_no.type = org.apache.kafka.connect.transforms.InsertField$Value transforms.metastore_server_name.static.field = __metastore_server_name transforms.metastore_server_name.static.value = NONE transforms.metastore_server_name.type = org.apache.kafka.connect.transforms.InsertField$Value transforms.schema_name_override.static.field = __cdc_msg_override_schema transforms.schema_name_override.static.value = NONE transforms.schema_name_override.type = org.apache.kafka.connect.transforms.InsertField$Value transforms.source_name.static.field = __source_name transforms.source_name.static.value = NONE transforms.source_name.type = org.apache.kafka.connect.transforms.InsertField$Value transforms.table_name_override.static.field = __cdc_msg_override_table transforms.table_name_override.static.value = NONE transforms.table_name_override.type = org.apache.kafka.connect.transforms.InsertField$Value transforms.timestamp_converter.field = __source_ts_ms transforms.timestamp_converter.format = yyyy-MM-dd HH:mm:ss.SSS transforms.timestamp_converter.target.type = string transforms.timestamp_converter.type = org.apache.kafka.connect.transforms.TimestampConverter$Value transforms.trg_schema_name.static.field = __cdc_msg_target_schema transforms.trg_schema_name.static.value = NONE transforms.trg_schema_name.type = org.apache.kafka.connect.transforms.InsertField$Value transforms.trg_table_name.static.field = __cdc_msg_target_table transforms.trg_table_name.static.value = NONE transforms.trg_table_name.type = org.apache.kafka.connect.transforms.InsertField$Value transforms.unwrap.add.fields = source.db,source.table,op,source.ts_ms:source_ts_ms,source.schema,source.snapshot,source.scn,source.commit_scn transforms.unwrap.delete.tombstone.handling.mode = rewrite transforms.unwrap.type = io.debezium.transforms.ExtractNewRecordState
What is the captured database version and mode of deployment?
On-prem Oracle Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production Version 19.23.0.0.0
What behavior do you expect?
After an engine has been stopped by calling close(), it should be able to start again when calling run() from the same DebeziumEngine<RecordChangeEvent<SourceRecord> object.
What behavior do you see?
The engine starts again when calling run() after it has stopped (close()).
Do you see the same behaviour using the latest released Debezium version?
We encounter the NullPointerException on the latest stable (2.7.1.Final) version. This does not occur on 2.7.0.Final.
Do you have the connector logs, ideally from start till finish?
... INFO [2024-08-29 12:19:50,138] io.debezium.relational.history.SchemaHistoryMetrics: Already applied 9 database changes INFO [2024-08-29 12:19:50,221] io.debezium.relational.RelationalSnapshotChangeEventSource: Snapshot step 7 - Skipping snapshotting of data INFO [2024-08-29 12:19:50,360] io.debezium.connector.common.BaseSourceTask: 1 records sent during previous 00:01:04.809, last recorded offset of {server=*****} partition is {commit_scn=null, snapshot_scn=53281660283, scn=53281660283} INFO [2024-08-29 12:19:50,360] io.debezium.embedded.EmbeddedEngine: Stopping the task and engine INFO [2024-08-29 12:19:50,360] io.debezium.connector.common.BaseSourceTask: Stopping down connector INFO [2024-08-29 12:19:50,432] io.debezium.pipeline.source.AbstractSnapshotChangeEventSource: Snapshot - Final stage INFO [2024-08-29 12:19:50,611] io.debezium.pipeline.source.AbstractSnapshotChangeEventSource: Snapshot completed INFO [2024-08-29 12:19:50,611] io.debezium.pipeline.ChangeEventSourceCoordinator: Snapshot ended with SnapshotResult [status=COMPLETED, offset=OracleOffsetContext [scn=53281660283, commit_scn=[], lcr_position=null]] INFO [2024-08-29 12:19:50,612] io.debezium.pipeline.ChangeEventSourceCoordinator: Connected metrics set to 'false' INFO [2024-08-29 12:19:50,612] io.debezium.util.Threads: Creating thread debezium-oracleconnector-*****-SignalProcessor INFO [2024-08-29 12:19:50,612] io.debezium.pipeline.signal.SignalProcessor: SignalProcessor stopped INFO [2024-08-29 12:19:50,735] io.debezium.jdbc.JdbcConnection: Connection gracefully closed INFO [2024-08-29 12:19:50,736] io.debezium.service.DefaultServiceRegistry: Debezium ServiceRegistry stopped. INFO [2024-08-29 12:19:50,844] io.debezium.jdbc.JdbcConnection: Connection gracefully closed INFO [2024-08-29 12:19:50,982] io.debezium.jdbc.JdbcConnection: Connection gracefully closed ERROR [2024-08-29 12:19:50,982] io.debezium.embedded.EmbeddedEngine: Stopping connector after error in the application's handler method: Cannot invoke "org.apache.kafka.common.cache.Cache.get(Object)" because "this.schemaUpdateCache" is null ! java.lang.NullPointerException: Cannot invoke "org.apache.kafka.common.cache.Cache.get(Object)" because "this.schemaUpdateCache" is null ! at org.apache.kafka.connect.transforms.InsertField.applyWithSchema(InsertField.java:173) ! at org.apache.kafka.connect.transforms.InsertField.apply(InsertField.java:142) ! at io.debezium.embedded.Transformations.transform(Transformations.java:92) ! at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ! at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ! at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ! at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ! at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) ! at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ! at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682) ! at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:733) ! at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:470) ! at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:163) ! at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ! at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ! at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ! at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ! at java.base/java.lang.Thread.run(Thread.java:833)
How to reproduce the issue using our tutorial deployment?
1. Call run() on DebeziumEngine<RecordChangeEvent<SourceRecord>> to start the engine. The engine should connect to Oracle database and pick up record changes.
2. Call close() on DebeziumEngine<RecordChangeEvent<SourceRecord>> to stop the engine.
3. Call run() again on the same DebeziumEngine<RecordChangeEvent<SourceRecord>>, a NullPointerException (schemaUpdateCache is null) should be thrown.
- links to
-
RHEA-2024:139598 Red Hat build of Debezium 2.5.4 release