Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-4031

Unable to set default AVRO schema value in a custom converter

    XMLWordPrintable

Details

    • Feature Request
    • Status: Open (View Workflow)
    • Major
    • Resolution: Unresolved
    • 1.6.0.Final
    • Backlog
    • core-library
    • None

    Description

      I am trying to implement a custom converter following this documentation https://debezium.io/documentation/reference/1.6/development/converters.html for my Debezium producer with AVRO schemas.

      Here is an example of a class that takes a varchar/text from a particular field and just modifies the schema a bit - https://gist.github.com/slvrtrn/f1f8506575713dd3f2b68bc559067f07

      There, as an example, I tried to set a default value

      SchemaBuilder schemaBuilder = SchemaBuilder.string().optional().defaultValue("test");

      However, if the source field had a default value defined in the database (MySQL), it is impossible to modify it from the code. I am getting the following exception:

       [2021-09-23 07:10:14,372] INFO Snapshot - Final stage (io.debezium.pipeline.source.AbstractSnapshotChangeEventSource)
      [2021-09-23 07:10:14,372] ERROR Producer failure (io.debezium.pipeline.ErrorHandler)
      io.debezium.DebeziumException: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid SchemaBuilder call: default has already been set.
              at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:79)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:125)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Invalid SchemaBuilder call: default has already been set.
              at org.apache.kafka.connect.data.SchemaBuilder.checkCanSet(SchemaBuilder.java:437)
              at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:126)
              at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:374)
              at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:119)
              at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
              at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
              at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
              at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
              at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
              at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
              at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
              at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
              at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
              at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:117)
              at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:130)
              at io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:171)
              at java.base/java.lang.Iterable.forEach(Iterable.java:75)
              at io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:171)
              at io.debezium.pipeline.EventDispatcher$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcher.java:535)
              at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.lambda$createSchemaChangeEventsForTables$7(MySqlSnapshotChangeEventSource.java:539)
              at io.debezium.pipeline.EventDispatcher.dispatchSchemaChangeEvent(EventDispatcher.java:304)
              at io.debezium.connector.mysql.MySqlSnapshotChangeEventSource.createSchemaChangeEventsForTables(MySqlSnapshotChangeEventSource.java:539)
              at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:126)
              at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:70)
              ... 6 more

      Attachments

        Activity

          People

            Unassigned Unassigned
            serge.klochkov Serge Klochkov (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: