-
Bug
-
Resolution: Done
-
Major
-
2.3.4.Final, 2.4.0.Final
-
None
Debezium server with custom converter fails when loading sql server default value 0 (zero). The custom converter converts all values to strings.
Link to debezium email group https://groups.google.com/g/debezium/c/m8k_p8O7dt4
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
debezium-server-dist-2.4.0.Final
What is the connector configuration?
debezium.sink.type=http # Sink that just get all events and prints to screen for testing debezium.sink.http.url=http://127.0.0.1:5555/ debezium.sink.topic.prefix=all_strings_cdc debezium.source.topic.prefix=all_strings_cdc debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector debezium.source.offset.storage.file.filename=./data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=<redacted> debezium.source.database.port=<redacted> debezium.source.database.user=<redacted> debezium.source.database.password=<redacted> debezium.source.database.names=aw2019 debezium.source.database.history.file.filename=./data/FileDatabaseHistory.dat debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory debezium.source.database.encrypt=false debezium.source.table.include.list=Person.Person quarkus.log.console.json=false quarkus.log.level=DEBUG debezium.format.value=json schema.history.internal.kafka.topic=test1 debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory debezium.source.schema.history.internal.file.filename=data/schema_history.dat debezium.source.converters=debezium.NullConverter debezium.source.debezium.NullConverter.type=debezium.NullConverter debezium.source.debezium.NullConverter.debug=true debezium.source.debezium.NullConverter.nullPlaceholder=[null] debezium.source.transforms=unwrap debezium.source.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState debezium.source.transforms.unwrap.delete.handling.mode=rewrite debezium.source.transforms.unwrap.add.fields=op,table,source.ts_ms debezium.source.key.converter.schemas.enable=false debezium.source.value.converter.schemas.enable=false
What is the captured database version and mode of deployment?
(E.g. on-premises, with a specific cloud provider, etc.)
SQLServer from GCP Cloud SQL - SQL Server 2019 Standard
The table in question is the aw2019.Person.Person, the column that fails
EmailPromotion int DEFAULT 0 NOT NULL,
Full DDL here:
CREATE TABLE aw2019.Person.Person ( BusinessEntityID int NOT NULL, PersonType nchar(2) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL, NameStyle NameStyle DEFAULT 0 NOT NULL, Title nvarchar(8) COLLATE SQL_Latin1_General_CP1_CI_AS DEFAULT '' NULL, FirstName Name COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL, MiddleName Name COLLATE SQL_Latin1_General_CP1_CI_AS NULL, LastName Name COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL, Suffix nvarchar(10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL, EmailPromotion int DEFAULT 0 NOT NULL, AdditionalContactInfo xml DEFAULT '' NULL, Demographics xml DEFAULT '' NULL, rowguid uniqueidentifier DEFAULT newid() NOT NULL, ModifiedDate datetime DEFAULT getdate() NOT NULL, CONSTRAINT PK_Person_BusinessEntityID PRIMARY KEY (BusinessEntityID));CREATE UNIQUE NONCLUSTERED INDEX AK_Person_rowguid ON aw2019.Person.Person (rowguid);CREATE NONCLUSTERED INDEX IX_Person_LastName_FirstName_MiddleName ON aw2019.Person.Person (LastName, FirstName, MiddleName);CREATE INDEX PXML_Person_AddContact ON aw2019.Person.Person (AdditionalContactInfo);CREATE INDEX PXML_Person_Demographics ON aw2019.Person.Person (Demographics);CREATE INDEX XMLPATH_Person_Demographics ON aw2019.Person.Person (Demographics);CREATE INDEX XMLPROPERTY_Person_Demographics ON aw2019.Person.Person (Demographics);CREATE INDEX XMLVALUE_Person_Demographics ON aw2019.Person.Person (Demographics);ALTER TABLE aw2019.Person.Person WITH NOCHECK ADD CONSTRAINT CK_Person_EmailPromotion CHECK ([EmailPromotion]>=(0) AND [EmailPromotion]<=(2));ALTER TABLE aw2019.Person.Person WITH NOCHECK ADD CONSTRAINT CK_Person_PersonType CHECK ([PersonType] IS NULL OR (upper([PersonType])='GC' OR upper([PersonType])='SP' OR upper([PersonType])='EM' OR upper([PersonType])='IN' OR upper([PersonType])='VC' OR upper([PersonType])='SC'));// code placeholder
The custom converter code:
public class NullConverter implements CustomConverter<SchemaBuilder, RelationalColumn> { private SchemaBuilder stringSchema; public void configure(Properties props) { stringSchema = SchemaBuilder.string(); } public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) { Converter converter = this::anyToString; registration.register(stringSchema, converter); } private String anyToString(Object rawValue) { String converted; if (rawValue == null) { converted = "[null]"; } else { converted = rawValue.toString(); } return converted; } }
What behaviour do you expect?
To not fail and convert 0 as a string.
What behaviour do you see?
Crash on line 404 in The TableSchemaBuilder.java
https://github.com/debezium/debezium/blob/2.3/debezium-core/src/main/java/io/debezium/relational/TableSchemaBuilder.java#L404
–
403 | fieldBuilder
404 | .defaultValue(customConverterRegistry.getValueConverter(table.id(), column)
405 | .orElse(ValueConverter.passthrough()).convert(defaultValue));
–
Do you see the same behaviour using the latest released Debezium version?
(Ideally, also verify with the latest Alpha/Beta/CR version)
No, I tried with the latest official version.
Do you have the connector logs, ideally from start to finish?
(You might be asked later to provide DEBUG/TRACE level log)
Yes.
The excerpt:
2023-10-16 11:58:02,533 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Error while trying to run connector class 'io.debezium.connector.sqlserver.SqlServerConnector'', error = 'org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.': org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:67) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:144) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:577) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1623)Caused by: io.debezium.DebeziumException: io.debezium.DebeziumException: io.debezium.DebeziumException: Failed to set field default value for 'aw2019.Person.Person.EmailPromotion' of type int, the default value is 0 of type class java.lang.Integer at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:101) at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:253) at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:237) at io.debezium.connector.sqlserver.SqlServerChangeEventSourceCoordinator.executeChangeEventSources(SqlServerChangeEventSourceCoordinator.java:82) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:137) ... 5 moreCaused by: io.debezium.DebeziumException: io.debezium.DebeziumException: Failed to set field default value for 'aw2019.Person.Person.EmailPromotion' of type int, the default value is 0 of type class java.lang.Integer at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createSchemaChangeEventsForTables$3(RelationalSnapshotChangeEventSource.java:387) at io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:367) at io.debezium.relational.RelationalSnapshotChangeEventSource.createSchemaChangeEventsForTables(RelationalSnapshotChangeEventSource.java:382) at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:154) at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:92) ... 9 moreCaused by: io.debezium.DebeziumException: Failed to set field default value for 'aw2019.Person.Person.EmailPromotion' of type int, the default value is 0 of type class java.lang.Integer at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:411) at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596) at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:147) at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:122) at io.debezium.connector.sqlserver.SqlServerDatabaseSchema.applySchemaChange(SqlServerDatabaseSchema.java:53) at io.debezium.pipeline.EventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcher.java:622) at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createSchemaChangeEventsForTables$3(RelationalSnapshotChangeEventSource.java:384) ... 13 moreCaused by: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid SchemaBuilder call: default has already been set. at org.apache.kafka.connect.data.SchemaBuilder.checkCanSet(SchemaBuilder.java:437) at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:126) at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:405) ... 28 more 2023-10-16 11:58:02,535 DEBUG [io.qua.run.Application] (main) Stopping application
How to reproduce the issue using our tutorial deployment?
The tutorial doesn't cover debezium server use case.