Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7045

Custom convert (all to strings) and SQLServer default '0' type issue

XMLWordPrintable

    • Important

      Debezium server with custom converter fails when loading sql server default value 0 (zero). The custom converter converts all values to strings.

       

      Link to debezium email group https://groups.google.com/g/debezium/c/m8k_p8O7dt4

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      debezium-server-dist-2.4.0.Final

      What is the connector configuration?

       

      debezium.sink.type=http # Sink that just get all events and prints to screen for testing debezium.sink.http.url=http://127.0.0.1:5555/ 
      debezium.sink.topic.prefix=all_strings_cdc debezium.source.topic.prefix=all_strings_cdc debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector debezium.source.offset.storage.file.filename=./data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=<redacted> debezium.source.database.port=<redacted> debezium.source.database.user=<redacted> debezium.source.database.password=<redacted> debezium.source.database.names=aw2019 debezium.source.database.history.file.filename=./data/FileDatabaseHistory.dat debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory debezium.source.database.encrypt=false debezium.source.table.include.list=Person.Person quarkus.log.console.json=false quarkus.log.level=DEBUG 
      debezium.format.value=json schema.history.internal.kafka.topic=test1 
      debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory debezium.source.schema.history.internal.file.filename=data/schema_history.dat 
      debezium.source.converters=debezium.NullConverter debezium.source.debezium.NullConverter.type=debezium.NullConverter debezium.source.debezium.NullConverter.debug=true debezium.source.debezium.NullConverter.nullPlaceholder=[null] 
      debezium.source.transforms=unwrap debezium.source.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState debezium.source.transforms.unwrap.delete.handling.mode=rewrite debezium.source.transforms.unwrap.add.fields=op,table,source.ts_ms debezium.source.key.converter.schemas.enable=false debezium.source.value.converter.schemas.enable=false

      What is the captured database version and mode of deployment?

      (E.g. on-premises, with a specific cloud provider, etc.)

      SQLServer from GCP Cloud SQL - SQL Server 2019 Standard

      The table in question is the aw2019.Person.Person, the column that fails 

       EmailPromotion int DEFAULT 0 NOT NULL,

      Full DDL here:

       

      CREATE TABLE aw2019.Person.Person (	BusinessEntityID int NOT NULL,	PersonType nchar(2) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL,	NameStyle NameStyle DEFAULT 0 NOT NULL,	Title nvarchar(8) COLLATE SQL_Latin1_General_CP1_CI_AS DEFAULT '' NULL,	FirstName Name COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL,	MiddleName Name COLLATE SQL_Latin1_General_CP1_CI_AS NULL,	LastName Name COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL,	Suffix nvarchar(10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,	EmailPromotion int DEFAULT 0 NOT NULL,	AdditionalContactInfo xml DEFAULT '' NULL,	Demographics xml DEFAULT '' NULL,	rowguid uniqueidentifier DEFAULT newid() NOT NULL,	ModifiedDate datetime DEFAULT getdate() NOT NULL,	CONSTRAINT PK_Person_BusinessEntityID PRIMARY KEY (BusinessEntityID));CREATE UNIQUE NONCLUSTERED INDEX AK_Person_rowguid ON aw2019.Person.Person (rowguid);CREATE NONCLUSTERED INDEX IX_Person_LastName_FirstName_MiddleName ON aw2019.Person.Person (LastName, FirstName, MiddleName);CREATE INDEX PXML_Person_AddContact ON aw2019.Person.Person (AdditionalContactInfo);CREATE INDEX PXML_Person_Demographics ON aw2019.Person.Person (Demographics);CREATE INDEX XMLPATH_Person_Demographics ON aw2019.Person.Person (Demographics);CREATE INDEX XMLPROPERTY_Person_Demographics ON aw2019.Person.Person (Demographics);CREATE INDEX XMLVALUE_Person_Demographics ON aw2019.Person.Person (Demographics);ALTER TABLE aw2019.Person.Person WITH NOCHECK ADD CONSTRAINT CK_Person_EmailPromotion CHECK ([EmailPromotion]>=(0) AND [EmailPromotion]<=(2));ALTER TABLE aw2019.Person.Person WITH NOCHECK ADD CONSTRAINT CK_Person_PersonType CHECK ([PersonType] IS NULL OR (upper([PersonType])='GC' OR upper([PersonType])='SP' OR upper([PersonType])='EM' OR upper([PersonType])='IN' OR upper([PersonType])='VC' OR upper([PersonType])='SC'));// code placeholder
      

       

      The custom converter code:

       

      public class NullConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
       
       private SchemaBuilder stringSchema;
      
       public void configure(Properties props) {
       stringSchema = SchemaBuilder.string(); }
      
       public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
       Converter converter = this::anyToString;
       registration.register(stringSchema, converter);
       }
      
       private String anyToString(Object rawValue) {
       String converted;
       if (rawValue == null) {
       converted = "[null]";
       } else {
       converted = rawValue.toString();
       }
      
       return converted;
       }
      }
      

       

       

      What behaviour do you expect?

      To not fail and convert 0 as a string.

      What behaviour do you see?

      Crash on line 404 in The TableSchemaBuilder.java 
      https://github.com/debezium/debezium/blob/2.3/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java#L404

      403 | fieldBuilder
      404 | .defaultValue(customConverterRegistry.getValueConverter(table.id(), column)
      405 | .orElse(ValueConverter.passthrough()).convert(defaultValue));

      Do you see the same behaviour using the latest released Debezium version?

      (Ideally, also verify with the latest Alpha/Beta/CR version)

      No, I tried with the latest official version.

      Do you have the connector logs, ideally from start to finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      Yes.

      The excerpt: 

       

      2023-10-16 11:58:02,533 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Error while trying to run connector class 'io.debezium.connector.sqlserver.SqlServerConnector'', error = 'org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.': org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.	at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:67)	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:144)	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577)	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)	at java.base/java.lang.Thread.run(Thread.java:1623)Caused by: io.debezium.DebeziumException: io.debezium.DebeziumException: io.debezium.DebeziumException: Failed to set field default value for 'aw2019.Person.Person.EmailPromotion' of type int, the default value is 0 of type class java.lang.Integer	at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:101)	at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:253)	at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:237)	at io.debezium.connector.sqlserver.SqlServerChangeEventSourceCoordinator.executeChangeEventSources(SqlServerChangeEventSourceCoordinator.java:82)	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:137)	... 5 moreCaused by: io.debezium.DebeziumException: io.debezium.DebeziumException: Failed to set field default value for 'aw2019.Person.Person.EmailPromotion' of type int, the default value is 0 of type class java.lang.Integer	at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createSchemaChangeEventsForTables$3(RelationalSnapshotChangeEventSource.java:387)	at io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:367)	at io.debezium.relational.RelationalSnapshotChangeEventSource.createSchemaChangeEventsForTables(RelationalSnapshotChangeEventSource.java:382)	at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:154)	at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:92)	... 9 moreCaused by: io.debezium.DebeziumException: Failed to set field default value for 'aw2019.Person.Person.EmailPromotion' of type int, the default value is 0 of type class java.lang.Integer	at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:411)	at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149)	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)	at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)	at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)	at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:147)	at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:122)	at io.debezium.connector.sqlserver.SqlServerDatabaseSchema.applySchemaChange(SqlServerDatabaseSchema.java:53)	at io.debezium.pipeline.EventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcher.java:622)	at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createSchemaChangeEventsForTables$3(RelationalSnapshotChangeEventSource.java:384)	... 13 moreCaused by: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid SchemaBuilder call: default has already been set.	at org.apache.kafka.connect.data.SchemaBuilder.checkCanSet(SchemaBuilder.java:437)	at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:126)	at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:405)	... 28 more
      2023-10-16 11:58:02,535 DEBUG [io.qua.run.Application] (main) Stopping application 

       

       

      How to reproduce the issue using our tutorial deployment?

      The tutorial doesn't cover debezium server use case. 

       

              Unassigned Unassigned
              liudasfastloop Liudas Sodonis (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: