Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1315

Processing MongoDB document contains UNDEFINED type causes exception with MongoDB Unwrap SMT

    XMLWordPrintable

Details

    • Hide
      1. Start docker-compose example
        https://github.com/debezium/debezium-examples/tree/master/unwrap-mongodb-smt
      2. Insert a document contains undefined type field
        docker-compose exec mongodb bash -c 'mongo -u $MONGODB_USER -p $MONGODB_PASSWORD --authenticationDatabase admin inventory --eval "db.customers.insert([{ _id : NumberLong(\"1006\"), test : undefined }])"'
        
      3. Crash
        org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
                at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
                at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
                at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:506)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
                at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
                at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
                at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)
        Caused by: org.apache.kafka.connect.errors.DataException: test is not a valid field name
                at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
                at org.apache.kafka.connect.data.Struct.put(Struct.java:202)
                at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:222)
                at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:45)
                at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.newRecord(UnwrapFromMongoDbEnvelope.java:273)
                at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:223)
                at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
                at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
                at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
                ... 14 more
        
      Show
      Start docker-compose example https://github.com/debezium/debezium-examples/tree/master/unwrap-mongodb-smt Insert a document contains undefined type field docker-compose exec mongodb bash -c 'mongo -u $MONGODB_USER -p $MONGODB_PASSWORD --authenticationDatabase admin inventory --eval "db.customers.insert([{ _id : NumberLong(\" 1006\ "), test : undefined }])" ' Crash org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:506) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors $RunnableAdapter .call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor $Worker .run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: test is not a valid field name at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at org.apache.kafka.connect.data.Struct.put(Struct.java:202) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:222) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:45) at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.newRecord(UnwrapFromMongoDbEnvelope.java:273) at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:223) at org.apache.kafka.connect.runtime.TransformationChain.lambda $apply $0 (TransformationChain.java:50) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 14 more

    Description

      I tried to use Unwrap SMT to migrate one of my MongoDB collection, and during the initial sync process, I got an exception. I have a look at the MongoDataConverter.java and find out BsonType.UNDEFINED is not listed in the switch cases, therefore during create updatedSchema, a field that is BsonType.UNDEFINED will be disgraced. The problem is that ConvertFieldValue will always put value into the Struct, even if its field schema does not exist. This inconsistency will result in throwing an exception when processing any record contained unsupported field type.

      Attachments

        Activity

          People

            Unassigned Unassigned
            ching.tsai Ching Tsai (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: