Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8076

Support capturing BLOB column types during snapshot for MySQL/MariaDB

XMLWordPrintable

    • Critical

      Bug report

      Hi there, we have recently updated our connectors and started using the MariadbConnector with debezium 2.7.0. With the embedded engine.

      During the migration we have noticed a problem while trying to parse BLOB columns from the source db.
      Lets say I have the following table and I try to insert only one record
       
      {{CREATE TABLE IF NOT EXISTS `test_blob_table` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`body` BLOB NOT NULL,`status` varchar(30) NOT NULL,`time_created` DATETIME DEFAULT NOW(),PRIMARY KEY (`id`))
      }}
      INSERT INTO `test_blob_table` (`body`, `status`, `time_created`)VALUES ('Test BLOB Data', 'active', NOW());

      What Debezium connector do you use and what version?

      Mariadb connector with the embedded engine 2.7.0.Final

      What is the connector configuration?

      props.setProperty("name", applicationName);
      props.setProperty("bootstrap.servers", bootstrapServers);
      props.setProperty("offset.storage", "org.apache.kafka.connect.storage.KafkaOffsetBackingStore");
      props.setProperty("offset.storage.topic", systemTopicPrefix + ".offset_storage");
      props.setProperty("offset.storage.partitions", "1");
      props.setProperty("offset.storage.replication.factor", "2");
      props.setProperty("offset.flush.interval.ms", "20000"); // 20 seconds
      props.setProperty("offset.flush.timeout.ms", "10000"); // 10 seconds
      props.setProperty("topic.prefix", topicPrefix);
      props.setProperty("database.server.id", generateServerId(applicationName));
      props.setProperty("schema.history.internal.kafka.bootstrap.servers", bootstrapServers);
      props.setProperty("schema.history.internal.kafka.topic", systemTopicPrefix + ".db_history");
      props.setProperty("snapshot.max.threads", String.valueOf(snapshotMaxThreads));
      props.setProperty("signal.enabled.channels", "kafka");
      props.setProperty("signal.kafka.topic", systemTopicPrefix + ".debezium_signals");
      props.setProperty("signal.kafka.bootstrap.servers", bootstrapServers);

      props.setProperty("connector.class", "io.debezium.connector.mariadb.MariaDbConnector");
      props.setProperty("connector.adapter", "mariadb");
      props.setProperty("database.hostname", debeziumSourceDatabaseConfig.getHostname());
      props.setProperty("database.port", debeziumSourceDatabaseConfig.getPort());
      props.setProperty("database.user", debeziumSourceDatabaseConfig.getUsername());
      props.setProperty("database.password", debeziumSourceDatabaseConfig.getPassword());
      props.setProperty("database.server.name", applicationName.replace('.', '_'));
      props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
      props.setProperty("database.history.producer.max.request.size", String.valueOf(MAX_HISTORY_MESSAGE_SIZE));
      props.setProperty("database.history.producer.buffer.memory", String.valueOf(MAX_HISTORY_MESSAGE_SIZE));
      props.setProperty("database.history.consumer.max.partition.fetch.bytes", String.valueOf(MAX_HISTORY_MESSAGE_SIZE));
      props.setProperty("database.history.store.only.captured.tables.ddl", String.valueOf(false));
      props.setProperty("database.include.list", debeziumSourceDatabaseConfig.getSchemaWhitelist());
      props.setProperty("database.timezone", "disable");
      props.setProperty("database.protocol", "jdbc:mariadb");
      props.setProperty("database.jdbc.driver", "org.mariadb.jdbc.Driver");
      props.setProperty("database.ssl.mode", "disabled");
      props.setProperty("snapshot.mode", debeziumSourceDatabaseConfig.getSnapshotMode());
      props.setProperty("include.schema.changes", "true");
      props.setProperty("connect.keep.alive", "true");
      props.setProperty("connections.max.idle.ms", "540000");

      What is the captured database version and mode of deployment?

      on-prem. 

      What behavior do you expect?

      During snapshot of table that have a BLOB column  to parse the blob file successfully

      What behavior do you see?

      The mentioned test table above was created with column of type BLOB. After that I have inserted a test row. After this i have started my mariadb connector with initial snapshot. My snapshot produced error. Note that if I am not in snapshot mode, the same record was processed successfully during normal binlog reading.

      Error logs received:
      13:52:08.439 INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Snapshot step 7 - Snapshotting data
      13:52:08.440 INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Creating snapshot worker pool with 1 worker thread(s)
      13:52:08.441 INFO io.debezium.relational.RelationalSnapshotChangeEventSource - For table 'social.test_blob_table' using select statement: 'SELECT idbodystatustime_created FROM social.test_blob_table'
      13:52:08.453 INFO io.debezium.connector.binlog.BinlogSnapshotChangeEventSource - Estimated row count for table social.test_blob_table is OptionalLong[0]
      13:52:08.455 INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exporting data from table 'social.test_blob_table' (1 of 1 tables)
      13:52:08.478 WARN io.debezium.connector.mariadb.jdbc.MariaDbValueConverters - Unexpected JDBC BINARY value for field body with schema Schema{BYTES}: class=class org.mariadb.jdbc.MariaDbBlob, value=org.mariadb.jdbc.MariaDbBlob@445f67b6
      13:52:08.479 WARN io.debezium.relational.TableSchemaBuilder - Failed to properly convert data value for 'social.test_blob_table.body' of type BLOB
      org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "body", schema type: BYTES
      at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
      at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
      at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:328)
      at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
      at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:87)
      at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:50)
      at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:216)
      at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:641)
      at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createDataEventsForTableCallable$6(RelationalSnapshotChangeEventSource.java:575)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      at java.base/java.lang.Thread.run(Thread.java:840)
      13:52:08.481 ERROR io.debezium.relational.RelationalSnapshotChangeEventSource - Error during snapshot
      java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "body", schema type: BYTES
      at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
      at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
      at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:523)
      at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:179)
      at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:96)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:285)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:269)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:192)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:143)
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      at java.base/java.lang.Thread.run(Thread.java:840)
      Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "body", schema type: BYTES
      at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
      at org.apache.kafka.connect.data.Struct.validate(Struct.java:233)
      at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:250)
      at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
      at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
      at io.debezium.data.Envelope.read(Envelope.java:260)
      at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:88)
      at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:50)
      at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:216)
      at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:641)
      at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createDataEventsForTableCallable$6(RelationalSnapshotChangeEventSource.java:575)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      ... 5 common frames omitted
      13:52:08.486 INFO io.debezium.pipeline.source.AbstractSnapshotChangeEventSource - Snapshot - Final stage
      13:52:08.487 WARN io.debezium.pipeline.source.AbstractSnapshotChangeEventSource - Snapshot was not completed successfully, it will be re-executed upon connector restart
      13:52:08.487 ERROR io.debezium.pipeline.ErrorHandler - Producer failure
      io.debezium.DebeziumException: java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "body", schema type: BYTES
      at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:105)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:285)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:269)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:192)
      at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:143)
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      at java.base/java.lang.Thread.run(Thread.java:840)
      Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "body", schema type: BYTES
      at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
      at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
      at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:523)
      at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:179)
      at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:96)
      ... 9 common frames omitted
      Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "body", schema type: BYTES
      at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
      at org.apache.kafka.connect.data.Struct.validate(Struct.java:233)
      at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:250)
      at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
      at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
      at io.debezium.data.Envelope.read(Envelope.java:260)
      at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:88)
      at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:50)
      at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:216)
      at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:641)
      at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createDataEventsForTableCallable$6(RelationalSnapshotChangeEventSource.java:575)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      ... 5 common frames omitted
      13:52:08.865 INFO io.debezium.embedded.EmbeddedEngine - Stopping the task and engine
      13:52:08.865 INFO io.debezium.connector.common.BaseSourceTask - Stopping down connector

      Zulipchat topic: https://debezium.zulipchat.com/#narrow/stream/302529-community-general/topic/Mariadb.20connector.2E.20Snapshot.20fails.20to.20parse.20BLOB.20column

            ccranfor@redhat.com Chris Cranford
            amanis Dimitar Hristov
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: