-
Bug
-
Resolution: Done
-
Major
-
2.7.0.Final
-
None
Bug report
Hi there, we have recently updated our connectors and started using the MariadbConnector with debezium 2.7.0. With the embedded engine.
During the migration we have noticed a problem while trying to parse BLOB columns from the source db.
Lets say I have the following table and I try to insert only one record
{{CREATE TABLE IF NOT EXISTS `test_blob_table` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`body` BLOB NOT NULL,`status` varchar(30) NOT NULL,`time_created` DATETIME DEFAULT NOW(),PRIMARY KEY (`id`))
}}
INSERT INTO `test_blob_table` (`body`, `status`, `time_created`)VALUES ('Test BLOB Data', 'active', NOW());
What Debezium connector do you use and what version?
Mariadb connector with the embedded engine 2.7.0.Final
What is the connector configuration?
props.setProperty("name", applicationName);
props.setProperty("bootstrap.servers", bootstrapServers);
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.KafkaOffsetBackingStore");
props.setProperty("offset.storage.topic", systemTopicPrefix + ".offset_storage");
props.setProperty("offset.storage.partitions", "1");
props.setProperty("offset.storage.replication.factor", "2");
props.setProperty("offset.flush.interval.ms", "20000"); // 20 seconds
props.setProperty("offset.flush.timeout.ms", "10000"); // 10 seconds
props.setProperty("topic.prefix", topicPrefix);
props.setProperty("database.server.id", generateServerId(applicationName));
props.setProperty("schema.history.internal.kafka.bootstrap.servers", bootstrapServers);
props.setProperty("schema.history.internal.kafka.topic", systemTopicPrefix + ".db_history");
props.setProperty("snapshot.max.threads", String.valueOf(snapshotMaxThreads));
props.setProperty("signal.enabled.channels", "kafka");
props.setProperty("signal.kafka.topic", systemTopicPrefix + ".debezium_signals");
props.setProperty("signal.kafka.bootstrap.servers", bootstrapServers);
props.setProperty("connector.class", "io.debezium.connector.mariadb.MariaDbConnector");
props.setProperty("connector.adapter", "mariadb");
props.setProperty("database.hostname", debeziumSourceDatabaseConfig.getHostname());
props.setProperty("database.port", debeziumSourceDatabaseConfig.getPort());
props.setProperty("database.user", debeziumSourceDatabaseConfig.getUsername());
props.setProperty("database.password", debeziumSourceDatabaseConfig.getPassword());
props.setProperty("database.server.name", applicationName.replace('.', '_'));
props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
props.setProperty("database.history.producer.max.request.size", String.valueOf(MAX_HISTORY_MESSAGE_SIZE));
props.setProperty("database.history.producer.buffer.memory", String.valueOf(MAX_HISTORY_MESSAGE_SIZE));
props.setProperty("database.history.consumer.max.partition.fetch.bytes", String.valueOf(MAX_HISTORY_MESSAGE_SIZE));
props.setProperty("database.history.store.only.captured.tables.ddl", String.valueOf(false));
props.setProperty("database.include.list", debeziumSourceDatabaseConfig.getSchemaWhitelist());
props.setProperty("database.timezone", "disable");
props.setProperty("database.protocol", "jdbc:mariadb");
props.setProperty("database.jdbc.driver", "org.mariadb.jdbc.Driver");
props.setProperty("database.ssl.mode", "disabled");
props.setProperty("snapshot.mode", debeziumSourceDatabaseConfig.getSnapshotMode());
props.setProperty("include.schema.changes", "true");
props.setProperty("connect.keep.alive", "true");
props.setProperty("connections.max.idle.ms", "540000");
What is the captured database version and mode of deployment?
on-prem.
What behavior do you expect?
During snapshot of table that have a BLOB column to parse the blob file successfully
What behavior do you see?
The mentioned test table above was created with column of type BLOB. After that I have inserted a test row. After this i have started my mariadb connector with initial snapshot. My snapshot produced error. Note that if I am not in snapshot mode, the same record was processed successfully during normal binlog reading.
Error logs received:
13:52:08.439 INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 7 - Snapshotting data
13:52:08.440 INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Creating snapshot worker pool with 1 worker thread(s)
13:52:08.441 INFO io.debezium.relational.RelationalSnapshotChangeEventSource - For table 'social.test_blob_table' using select statement: 'SELECT id, body, status, time_created FROM social.test_blob_table'
13:52:08.453 INFO io.debezium.connector.binlog.BinlogSnapshotChangeEventSource - Estimated row count for table social.test_blob_table is OptionalLong[0]
13:52:08.455 INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exporting data from table 'social.test_blob_table' (1 of 1 tables)
13:52:08.478 WARN io.debezium.connector.mariadb.jdbc.MariaDbValueConverters - Unexpected JDBC BINARY value for field body with schema Schema{BYTES}: class=class org.mariadb.jdbc.MariaDbBlob, value=org.mariadb.jdbc.MariaDbBlob@445f67b6
13:52:08.479 WARN io.debezium.relational.TableSchemaBuilder - Failed to properly convert data value for 'social.test_blob_table.body' of type BLOB
org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "body", schema type: BYTES
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:328)
at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:87)
at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:50)
at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:216)
at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:641)
at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createDataEventsForTableCallable$6(RelationalSnapshotChangeEventSource.java:575)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
13:52:08.481 ERROR io.debezium.relational.RelationalSnapshotChangeEventSource - Error during snapshot
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "body", schema type: BYTES
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:523)
at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:179)
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:96)
at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:285)
at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:269)
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:192)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:143)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "body", schema type: BYTES
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
at org.apache.kafka.connect.data.Struct.validate(Struct.java:233)
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:250)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
at io.debezium.data.Envelope.read(Envelope.java:260)
at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:88)
at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:50)
at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:216)
at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:641)
at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createDataEventsForTableCallable$6(RelationalSnapshotChangeEventSource.java:575)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
... 5 common frames omitted
13:52:08.486 INFO io.debezium.pipeline.source.AbstractSnapshotChangeEventSource - Snapshot - Final stage
13:52:08.487 WARN io.debezium.pipeline.source.AbstractSnapshotChangeEventSource - Snapshot was not completed successfully, it will be re-executed upon connector restart
13:52:08.487 ERROR io.debezium.pipeline.ErrorHandler - Producer failure
io.debezium.DebeziumException: java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "body", schema type: BYTES
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:105)
at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:285)
at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:269)
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:192)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:143)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "body", schema type: BYTES
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:523)
at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:179)
at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:96)
... 9 common frames omitted
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "body", schema type: BYTES
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
at org.apache.kafka.connect.data.Struct.validate(Struct.java:233)
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:250)
at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
at io.debezium.data.Envelope.read(Envelope.java:260)
at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:88)
at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:50)
at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:216)
at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:641)
at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createDataEventsForTableCallable$6(RelationalSnapshotChangeEventSource.java:575)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
... 5 common frames omitted
13:52:08.865 INFO io.debezium.embedded.EmbeddedEngine - Stopping the task and engine
13:52:08.865 INFO io.debezium.connector.common.BaseSourceTask - Stopping down connector
Zulipchat topic: https://debezium.zulipchat.com/#narrow/stream/302529-community-general/topic/Mariadb.20connector.2E.20Snapshot.20fails.20to.20parse.20BLOB.20column
- links to
-
RHEA-2024:139598 Red Hat build of Debezium 2.5.4 release