Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1767

DBREF fields causes SchemaParseException using New Record State Extraction SMT and Avro converter

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 1.1.0.Beta2
    • 1.0.0.Final
    • mongodb-connector
    • None

      When a field in a document holds DBREF such as :

      "images":[
         {
            "image":{
               "$ref":"image",
               "$id":{
                  "$oid":"5ddfc7bd2ca640000191f0af"
               }
            },
            "logo":true
         }
      ]
      

      when using the ExtractNewDocumentState and Avro converter, it causes an error because of the $ref (at least) that is not properly sanitized

      [2020-02-05 14:27:38,878] ERROR WorkerSourceTask{id=MongoDbConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
              at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
              at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
              at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
              at java.util.concurrent.FutureTask.run(FutureTask.java:266)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.avro.SchemaParseException: Illegal initial character: $ref
              at org.apache.avro.Schema.validateName(Schema.java:1528)
              at org.apache.avro.Schema.access$400(Schema.java:87)
              at org.apache.avro.Schema$Field.<init>(Schema.java:520)
              at org.apache.avro.Schema$Field.<init>(Schema.java:559)
              at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1059)
              at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:899)
              at io.confluent.connect.avro.AvroData.fromConnectSchemaWithCycle(AvroData.java:1021)
              at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:886)
              at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1058)
              at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:899)
              at io.confluent.connect.avro.AvroData.fromConnectSchemaWithCycle(AvroData.java:1021)
              at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:886)
              at io.confluent.connect.avro.AvroData.fromConnectSchemaWithCycle(AvroData.java:1021)
              at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:834)
              at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1058)
              at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:899)
              at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:731)
              at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:725)
              at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:364)
              at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
              at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62)
              at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290)
              at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
              at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
              ... 11 more
      

              Unassigned Unassigned
              pierre.bouvret pierre bouvret (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: