-
Bug
-
Resolution: Done
-
Major
-
1.0.0.Final
-
None
When a field in a document holds DBREF such as :
"images":[ { "image":{ "$ref":"image", "$id":{ "$oid":"5ddfc7bd2ca640000191f0af" } }, "logo":true } ]
when using the ExtractNewDocumentState and Avro converter, it causes an error because of the $ref (at least) that is not properly sanitized
[2020-02-05 14:27:38,878] ERROR WorkerSourceTask{id=MongoDbConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.SchemaParseException: Illegal initial character: $ref
at org.apache.avro.Schema.validateName(Schema.java:1528)
at org.apache.avro.Schema.access$400(Schema.java:87)
at org.apache.avro.Schema$Field.<init>(Schema.java:520)
at org.apache.avro.Schema$Field.<init>(Schema.java:559)
at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1059)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:899)
at io.confluent.connect.avro.AvroData.fromConnectSchemaWithCycle(AvroData.java:1021)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:886)
at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1058)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:899)
at io.confluent.connect.avro.AvroData.fromConnectSchemaWithCycle(AvroData.java:1021)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:886)
at io.confluent.connect.avro.AvroData.fromConnectSchemaWithCycle(AvroData.java:1021)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:834)
at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1058)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:899)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:731)
at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:725)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:364)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more