Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1315

Processing MongoDB document contains UNDEFINED type causes exception with MongoDB Unwrap SMT

    Details

    • Steps to Reproduce:
      Hide
      1. Start docker-compose example
        https://github.com/debezium/debezium-examples/tree/master/unwrap-mongodb-smt
      2. Insert a document contains undefined type field
        docker-compose exec mongodb bash -c 'mongo -u $MONGODB_USER -p $MONGODB_PASSWORD --authenticationDatabase admin inventory --eval "db.customers.insert([{ _id : NumberLong(\"1006\"), test : undefined }])"'
        
      3. Crash
        org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
                at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
                at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
                at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:506)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
                at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
                at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)
        Caused by: org.apache.kafka.connect.errors.DataException: test is not a valid field name
                at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
                at org.apache.kafka.connect.data.Struct.put(Struct.java:202)
                at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:222)
                at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:45)
                at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.newRecord(UnwrapFromMongoDbEnvelope.java:273)
                at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:223)
                at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
                at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
                at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
                ... 14 more
        
      Show
      Start docker-compose example https://github.com/debezium/debezium-examples/tree/master/unwrap-mongodb-smt Insert a document contains undefined type field docker-compose exec mongodb bash -c 'mongo -u $MONGODB_USER -p $MONGODB_PASSWORD --authenticationDatabase admin inventory --eval "db.customers.insert([{ _id : NumberLong(\" 1006\ "), test : undefined }])" ' Crash org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:506) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors $RunnableAdapter .call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor $Worker .run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: test is not a valid field name at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at org.apache.kafka.connect.data.Struct.put(Struct.java:202) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:222) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:45) at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.newRecord(UnwrapFromMongoDbEnvelope.java:273) at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:223) at org.apache.kafka.connect.runtime.TransformationChain.lambda $apply $0 (TransformationChain.java:50) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 14 more

      Description

      I tried to use Unwrap SMT to migrate one of my MongoDB collection, and during the initial sync process, I got an exception. I have a look at the MongoDataConverter.java and find out BsonType.UNDEFINED is not listed in the switch cases, therefore during create updatedSchema, a field that is BsonType.UNDEFINED will be disgraced. The problem is that ConvertFieldValue will always put value into the Struct, even if its field schema does not exist. This inconsistency will result in throwing an exception when processing any record contained unsupported field type.

        Gliffy Diagrams

          Attachments

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                ching.tsai Ching Tsai
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: