-
Bug
-
Resolution: Done
-
Major
-
None
-
None
-
False
-
None
-
False
What Debezium connector do you use and what version?
io.debezium.connector.mongodb.MongoDbConnector, 2.1.4.Final
What is the connector configuration?
{ "collection.include.list": "<replace_me>", "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "errors.deadletterqueue.context.headers.enable": "true", "errors.deadletterqueue.topic.name": "<replace_me>", "errors.log.enable": "true", "errors.log.include.messages": "true", "errors.tolerance": "all", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": "false", "mongodb.hosts": "<replace_me>", "mongodb.password": "<replace_me>", "mongodb.ssl.enabled": "true", "mongodb.user": "<replace_me>", "name": "<replace_me>", "tasks.max": "1", "topic.prefix": "<replace_me>", "transforms": "outbox", "transforms.outbox.collection.expand.json.payload": "true", "transforms.outbox.collection.field.event.key": "aggregateRootId", "transforms.outbox.route.by.field": "aggregateRootType", "transforms.outbox.route.topic.replacement": "<replace_me>.${routedByValue}", "transforms.outbox.type": "io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false" }
What is the captured database version and mode of deployment?
MongoDB 6.0, provided by MongoDB Atlas
What behavior do you expect?
event is sent to kafka
{ "parentarray": [ { "stringfield": "stringvalue", "nestedarray": [] } ] }
What behavior do you see?
Connector produces an error, skips recod, event does not appear in kafka.
Do you see the same behaviour using the latest released Debezium version?
3.0.6.Final
Do you have the connector logs, ideally from start till finish?
[2025-01-13 14:16:09,786] WARN [array|task-0] Failed to expand after field: nestedarray is not a valid field name (io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter:67) org.apache.kafka.connect.errors.DataException: nestedarray is not a valid field name at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at org.apache.kafka.connect.data.Struct.put(Struct.java:202) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:214) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:265) at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:189) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:181) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:151) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:60) at io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter.buildNewAfterStruct(MongoEventRouter.java:195) at io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter.expandAfterField(MongoEventRouter.java:124) at io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter.lambda$apply$0(MongoEventRouter.java:64) at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:113) at io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter.apply(MongoEventRouter.java:62) at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57) at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:54) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:401) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) [2025-01-13 14:16:09,788] ERROR [array|task-0] Error encountered in task array-0. Executing stage 'TRANSFORMATION' with class 'io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter', where source record is = SourceRecord{sourcePartition={rs=rs0, server_id=dev.private.app.fct}, sourceOffset={sec=1736777769, ord=1, transaction_id=null, resume_token=8267852029000000012B042C0100296E5A1004ABF65439FDCB46AD9CB53C276DBA0096463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F6964006467852029EED003C8505A256D000004}} ConnectRecord{topic='dev.private.app.fct.app.eventStore', kafkaPartition=null, key=Struct{id={"$oid": "67852029eed003c8505a256d"}}, keySchema=Schema{dev.private.app.fct.app.eventStore.Key:STRUCT}, value=Struct{after={"_id": {"$oid": "67852029eed003c8505a256d"},"aggregateRootId": "4991ece5-b06a-40d9-996e-a76110db8c51","aggregateRootType": "type","version": 0,"payload": {"parentarray": [{"stringfield": "stringvalue","nestedarray": []}]}},source=Struct{version=2.1.4.Final,connector=mongodb,name=dev.private.app.fct,ts_ms=1736777769000,db=app,rs=rs0,collection=eventStore,ord=1},op=c,ts_ms=1736777769545}, valueSchema=Schema{dev.private.app.fct.app.eventStore.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}. (org.apache.kafka.connect.runtime.errors.LogReporter:66) org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [Read Outbox Event], found: java.lang.String at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52) at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:120) at io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter.apply(MongoEventRouter.java:62) at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57) at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:54) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:54) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:401) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)
How to reproduce the issue using our tutorial deployment?
Basically the similar issue was closed before: https://issues.redhat.com/browse/DBZ-5434, but the provided fix does not cover my scenario.
Parent array should have an object which contains some string field and empty array, then this test will fail.
So the issue can be reproduced in existent ExtractNewDocumentStateTestIT#shouldSupportNestedArrays test if input is updated accordingly (see attachment).