Details
-
Feature Request
-
Resolution: Unresolved
-
Major
-
1.6.0.Final
-
None
-
False
-
False
-
0
-
0%
Description
I am trying to implement a custom converter following this documentation https://debezium.io/documentation/reference/1.6/development/converters.html for my Debezium producer with AVRO schemas.
Here is an example of a class that takes a varchar/text from a particular field and just modifies the schema a bit - https://gist.github.com/slvrtrn/f1f8506575713dd3f2b68bc559067f07
There, as an example, I tried to set a default value
SchemaBuilder schemaBuilder = SchemaBuilder.string().optional().defaultValue("test");
However, if the source field had a default value defined in the database (MySQL), it is impossible to modify it from the code. I am getting the following exception:
[2021-09-23 07:10:14,372] INFO Snapshot - Final stage (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource) [2021-09-23 07:10:14,372] ERROR Producer failure (io.debezium.pipeline.ErrorHandler) io.debezium.DebeziumException: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid SchemaBuilder call: default has already been set. at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:79) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:125) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid SchemaBuilder call: default has already been set. at org.apache.kafka.connect.data.SchemaBuilder.checkCanSet(SchemaBuilder.java:437) at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:126) at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374) at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117) at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130) at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:171) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:171) at io.debezium.pipeline.EventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcher.java:535) at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.lambda$createSchemaChangeEventsForTables$7(MySqlSnapshotChangeEventSource.java:539) at io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:304) at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.createSchemaChangeEventsForTables(MySqlSnapshotChangeEventSource.java:539) at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:126) at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:70) ... 6 more