Details
-
Bug
-
Resolution: Not a Bug
-
Critical
-
None
-
1.7.0.Final
-
None
-
False
-
None
-
False
Description
We store events in a MongoDB collection, they get picked up by Debezium and send to predefined Kafka topics. We implemented an outbox pattern as described in the following article: https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/.
Recently we've encountered an issue where Debezium throws an error because it claims to not recognize a field. The unit field inside the nested array_B cannot be found.
{ "_id": { "$oid": "objectId" }, "data": { "_id": "id1", "value": { "array_A": [ { "type": "B", "state": "active", "array_B": [{ "maxValue": "10", "minValue": "1", "count": "50" }] }, { "type": "B", "state": "active", "array_B": [{ "maxValue": "10", "minValue": "1", "unit": kg }] }] } }, ... }
Caused by: org.apache.kafka.connect.errors.DataException: Failed to find field 'unit' in schema app.db.collection.data.array_A.array_B
at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:144)
at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:265)
at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:189)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:181)
at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:265)
at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:189)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:181)
at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:151)
at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:151)
at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:60)
at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.newRecord(ExtractNewDocumentState.java:324)
at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.apply(ExtractNewDocumentState.java:264)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
Any help would be appreciated