-
Bug
-
Resolution: Done
-
Major
-
1.9.5.Final
-
None
-
False
-
None
-
False
What Debezium connector do you use and what version?
Debezium MongoDb Connector, 1.9.5.Final
What is the connector configuration?
key.converter.schemas.enable: false
value.converter.schemas.enable: false
offset.flush.interval.ms: 5000
mongodb.hosts: localhost:27017
mongodb.name: xxxx
mongodb.user: xxxx
mongodb.password: xxxx
mongodb.ssl.enabled: false
connector.class: io.debezium.connector.mongodb.MongoDbConnector
database.include.list: xxxx
collection.include.list: xxxx
connect.backoff.initial.delay.ms: 1000
connect.backoff.max.delay.ms: 10000
connect.max.attempts: 3
snapshot.fetch.size: 2000
What is the captured database version and mode of depoyment?
mongo:4.2
What behaviour do you expect?
MongoDB change event converted correctly and reached handleEvent method of the Debezium consumer.
What behaviour do you see?
Error in converting mongodb document to kafka connect schema.
Do you see the same behaviour using the latest released Debezium version?
Yes
Do you have the connector logs, ideally from start till finish?
For this document: { "_id" : ObjectId("6182b1a25711ed59dd6a1d6c"), "planExecutionId" : "MJXINSMZ", "f1" : { "f2" : [ { "f3" : {} }, { "f3" : {"f5": 5} } ] } } I'm getting below error: ERROR io.debezium.embedded.EmbeddedEngine - Stopping connector after error in the application's handler method: f5 is not a valid field name org.apache.kafka.connect.errors.DataException: f5 is not a valid field name at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at org.apache.kafka.connect.data.Struct.put(Struct.java:202) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:214) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:151) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:265) at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:189) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:181) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:151) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:60) at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.newRecord(ExtractNewDocumentState.java:324) at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.apply(ExtractNewDocumentState.java:264) at io.debezium.embedded.Transformations.transform(Transformations.java:74) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:814) at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 2022-07-29 11:24:57,094 [debezium-controller-thread-0] INFO io.debezium.embedded.EmbeddedEngine - Stopping the embedded engine For this document: { "_id" : ObjectId("6182b1a25711ed59dd6a1d6c"), "planExecutionId" : "MJXINSMZ", "f1" : { "f2" : [ { "f3" : [] }, { "f3" : [{"f5": 5}] } ] } } I'm getting below error: ERROR io.debezium.embedded.EmbeddedEngine - Stopping connector after error in the application's handler method: Not a struct schema: Schema{STRING} org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{STRING} at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:263) at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:189) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:181) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:265) at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:189) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:181) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:151) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:60) at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.newRecord(ExtractNewDocumentState.java:324) at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.apply(ExtractNewDocumentState.java:264) at io.debezium.embedded.Transformations.transform(Transformations.java:74) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:814) at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)
How to reproduce the issue using our tutorial deployment?
Use following mongo documents: 1. { "_id" : ObjectId("6182b1a25711ed59dd6a1d6c"), "f1" : { "f2" : [ { "f3" : {} }, { "f3" : {"f5": 5} } ] } } 2. { "_id" : ObjectId("6182b1a25711ed59dd6a1d6c"), "f1" : { "f2" : [ { "f3" : [] }, { "f3" : [{"f5": 5}] } ] } }
Implementation Ideas (Optional)